1use crate::core::events::EventEnvelope;
7use crate::events::log::EventLog;
8use crate::events::types::{SeekPosition, SeqNo};
9use anyhow::Result;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio::sync::{Notify, RwLock};
15use tokio_stream::Stream;
16
17#[derive(Debug, Clone)]
34pub struct InMemoryEventLog {
35 inner: Arc<InMemoryEventLogInner>,
36}
37
38#[derive(Debug)]
39struct InMemoryEventLogInner {
40 events: RwLock<Vec<EventEnvelope>>,
42 positions: RwLock<HashMap<String, SeqNo>>,
44 notify: Notify,
46}
47
48impl InMemoryEventLog {
49 pub fn new() -> Self {
51 Self {
52 inner: Arc::new(InMemoryEventLogInner {
53 events: RwLock::new(Vec::new()),
54 positions: RwLock::new(HashMap::new()),
55 notify: Notify::new(),
56 }),
57 }
58 }
59}
60
61impl Default for InMemoryEventLog {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67#[async_trait]
68impl EventLog for InMemoryEventLog {
69 async fn append(&self, mut envelope: EventEnvelope) -> Result<SeqNo> {
70 let seq_no;
71 {
72 let mut events = self.inner.events.write().await;
73 seq_no = (events.len() + 1) as SeqNo;
74 envelope.seq_no = Some(seq_no);
75 events.push(envelope);
76 }
77 self.inner.notify.notify_waiters();
79 Ok(seq_no)
80 }
81
82 async fn subscribe(
83 &self,
84 consumer: &str,
85 position: SeekPosition,
86 ) -> Result<Pin<Box<dyn Stream<Item = EventEnvelope> + Send>>> {
87 let start_seq = match position {
88 SeekPosition::Beginning => 0,
89 SeekPosition::Latest => {
90 let events = self.inner.events.read().await;
91 events.len() as SeqNo
92 }
93 SeekPosition::Sequence(seq) => seq.saturating_sub(1), SeekPosition::LastAcknowledged => {
95 let positions = self.inner.positions.read().await;
96 positions.get(consumer).copied().unwrap_or(0)
97 }
98 };
99
100 let inner = self.inner.clone();
101
102 let stream =
106 futures::stream::unfold((inner, start_seq), |(inner, mut cursor)| async move {
107 loop {
108 let maybe_event = {
111 let events = inner.events.read().await;
112 let c = cursor as usize;
113 if c < events.len() {
114 Some(events[c].clone())
115 } else {
116 None
117 }
118 }; if let Some(event) = maybe_event {
121 cursor += 1;
122 return Some((event, (inner, cursor)));
123 }
124
125 inner.notify.notified().await;
129 }
130 });
131
132 Ok(Box::pin(stream))
133 }
134
135 async fn ack(&self, consumer: &str, seq_no: SeqNo) -> Result<()> {
136 let mut positions = self.inner.positions.write().await;
137 positions.insert(consumer.to_string(), seq_no);
138 Ok(())
139 }
140
141 async fn seek(&self, consumer: &str, position: SeekPosition) -> Result<()> {
142 let seq_no = match position {
143 SeekPosition::Beginning => 0,
144 SeekPosition::Latest => {
145 let events = self.inner.events.read().await;
146 events.len() as SeqNo
147 }
148 SeekPosition::Sequence(seq) => seq,
149 SeekPosition::LastAcknowledged => {
150 return Ok(());
152 }
153 };
154 let mut positions = self.inner.positions.write().await;
155 positions.insert(consumer.to_string(), seq_no);
156 Ok(())
157 }
158
159 async fn last_seq_no(&self) -> Option<SeqNo> {
160 let events = self.inner.events.read().await;
161 if events.is_empty() {
162 None
163 } else {
164 Some(events.len() as SeqNo)
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use crate::core::events::{EntityEvent, EventEnvelope, FrameworkEvent, LinkEvent};
173 use serde_json::json;
174 use tokio_stream::StreamExt;
175 use uuid::Uuid;
176
177 fn make_entity_event(entity_type: &str) -> EventEnvelope {
178 EventEnvelope::new(FrameworkEvent::Entity(EntityEvent::Created {
179 entity_type: entity_type.to_string(),
180 entity_id: Uuid::new_v4(),
181 data: json!({"name": "test"}),
182 }))
183 }
184
185 fn make_link_event(link_type: &str) -> EventEnvelope {
186 EventEnvelope::new(FrameworkEvent::Link(LinkEvent::Created {
187 link_type: link_type.to_string(),
188 link_id: Uuid::new_v4(),
189 source_id: Uuid::new_v4(),
190 target_id: Uuid::new_v4(),
191 metadata: None,
192 }))
193 }
194
195 #[tokio::test]
196 async fn test_append_returns_sequential_ids() {
197 let log = InMemoryEventLog::new();
198
199 let seq1 = log.append(make_entity_event("user")).await.unwrap();
200 let seq2 = log.append(make_entity_event("order")).await.unwrap();
201 let seq3 = log.append(make_link_event("follows")).await.unwrap();
202
203 assert_eq!(seq1, 1);
204 assert_eq!(seq2, 2);
205 assert_eq!(seq3, 3);
206 }
207
208 #[tokio::test]
209 async fn test_last_seq_no_empty() {
210 let log = InMemoryEventLog::new();
211 assert_eq!(log.last_seq_no().await, None);
212 }
213
214 #[tokio::test]
215 async fn test_last_seq_no_after_appends() {
216 let log = InMemoryEventLog::new();
217 log.append(make_entity_event("user")).await.unwrap();
218 log.append(make_entity_event("order")).await.unwrap();
219 assert_eq!(log.last_seq_no().await, Some(2));
220 }
221
222 #[tokio::test]
223 async fn test_subscribe_from_beginning() {
224 let log = InMemoryEventLog::new();
225
226 for i in 0..5 {
228 log.append(make_entity_event(&format!("type_{i}")))
229 .await
230 .unwrap();
231 }
232
233 let stream = log
235 .subscribe("test-consumer", SeekPosition::Beginning)
236 .await
237 .unwrap();
238
239 let events: Vec<_> = stream.take(5).collect().await;
241 assert_eq!(events.len(), 5);
242
243 assert_eq!(events[0].event.entity_type(), Some("type_0"));
245 assert_eq!(events[4].event.entity_type(), Some("type_4"));
246 }
247
248 #[tokio::test]
249 async fn test_subscribe_from_latest_only_gets_new() {
250 let log = InMemoryEventLog::new();
251
252 log.append(make_entity_event("old_event")).await.unwrap();
254 log.append(make_entity_event("old_event_2")).await.unwrap();
255
256 let mut stream = log
258 .subscribe("test-consumer", SeekPosition::Latest)
259 .await
260 .unwrap();
261
262 let log_clone = log.clone();
264 tokio::spawn(async move {
265 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
266 log_clone
267 .append(make_entity_event("new_event"))
268 .await
269 .unwrap();
270 });
271
272 let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
274 .await
275 .unwrap()
276 .unwrap();
277 assert_eq!(event.event.entity_type(), Some("new_event"));
278 }
279
280 #[tokio::test]
281 async fn test_subscribe_from_sequence() {
282 let log = InMemoryEventLog::new();
283
284 for i in 0..5 {
286 log.append(make_entity_event(&format!("type_{i}")))
287 .await
288 .unwrap();
289 }
290
291 let stream = log
293 .subscribe("test-consumer", SeekPosition::Sequence(3))
294 .await
295 .unwrap();
296
297 let events: Vec<_> = stream.take(3).collect().await;
298 assert_eq!(events.len(), 3);
299 assert_eq!(events[0].event.entity_type(), Some("type_2")); }
301
302 #[tokio::test]
303 async fn test_ack_advances_position() {
304 let log = InMemoryEventLog::new();
305
306 for i in 0..5 {
308 log.append(make_entity_event(&format!("type_{i}")))
309 .await
310 .unwrap();
311 }
312
313 log.ack("consumer-a", 3).await.unwrap();
315
316 let stream = log
318 .subscribe("consumer-a", SeekPosition::LastAcknowledged)
319 .await
320 .unwrap();
321
322 let events: Vec<_> = stream.take(2).collect().await;
323 assert_eq!(events.len(), 2);
324 assert_eq!(events[0].event.entity_type(), Some("type_3")); }
326
327 #[tokio::test]
328 async fn test_seek_repositions_consumer() {
329 let log = InMemoryEventLog::new();
330
331 for i in 0..5 {
333 log.append(make_entity_event(&format!("type_{i}")))
334 .await
335 .unwrap();
336 }
337
338 log.ack("consumer-b", 5).await.unwrap();
340
341 log.seek("consumer-b", SeekPosition::Beginning)
343 .await
344 .unwrap();
345
346 let stream = log
348 .subscribe("consumer-b", SeekPosition::LastAcknowledged)
349 .await
350 .unwrap();
351
352 let events: Vec<_> = stream.take(5).collect().await;
353 assert_eq!(events.len(), 5);
354 assert_eq!(events[0].event.entity_type(), Some("type_0"));
355 }
356
357 #[tokio::test]
358 async fn test_multiple_consumers_independent_positions() {
359 let log = InMemoryEventLog::new();
360
361 for i in 0..5 {
363 log.append(make_entity_event(&format!("type_{i}")))
364 .await
365 .unwrap();
366 }
367
368 log.ack("consumer-a", 2).await.unwrap();
370 log.ack("consumer-b", 4).await.unwrap();
372
373 let stream_a = log
375 .subscribe("consumer-a", SeekPosition::LastAcknowledged)
376 .await
377 .unwrap();
378 let events_a: Vec<_> = stream_a.take(3).collect().await;
379 assert_eq!(events_a.len(), 3); let stream_b = log
383 .subscribe("consumer-b", SeekPosition::LastAcknowledged)
384 .await
385 .unwrap();
386 let events_b: Vec<_> = stream_b.take(1).collect().await;
387 assert_eq!(events_b.len(), 1); }
389
390 #[tokio::test]
391 async fn test_live_subscription_receives_new_events() {
392 let log = InMemoryEventLog::new();
393
394 let mut stream = log
395 .subscribe("live-consumer", SeekPosition::Latest)
396 .await
397 .unwrap();
398
399 let log_clone = log.clone();
401 tokio::spawn(async move {
402 for i in 0..3 {
403 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
404 log_clone
405 .append(make_entity_event(&format!("live_{i}")))
406 .await
407 .unwrap();
408 }
409 });
410
411 for i in 0..3 {
413 let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
414 .await
415 .unwrap()
416 .unwrap();
417 assert_eq!(
418 event.event.entity_type(),
419 Some(format!("live_{i}").as_str())
420 );
421 }
422 }
423
424 #[tokio::test]
425 async fn test_replay_then_live() {
426 let log = InMemoryEventLog::new();
427
428 for i in 0..3 {
430 log.append(make_entity_event(&format!("old_{i}")))
431 .await
432 .unwrap();
433 }
434
435 let mut stream = log
437 .subscribe("replay-consumer", SeekPosition::Beginning)
438 .await
439 .unwrap();
440
441 for i in 0..3 {
443 let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
444 .await
445 .unwrap()
446 .unwrap();
447 assert_eq!(event.event.entity_type(), Some(format!("old_{i}").as_str()));
448 }
449
450 let log_clone = log.clone();
452 tokio::spawn(async move {
453 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
454 log_clone
455 .append(make_entity_event("live_new"))
456 .await
457 .unwrap();
458 });
459
460 let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
462 .await
463 .unwrap()
464 .unwrap();
465 assert_eq!(event.event.entity_type(), Some("live_new"));
466 }
467
468 #[tokio::test]
469 async fn test_unacked_consumer_starts_from_zero() {
470 let log = InMemoryEventLog::new();
471
472 log.append(make_entity_event("first")).await.unwrap();
474 log.append(make_entity_event("second")).await.unwrap();
475
476 let stream = log
478 .subscribe("new-consumer", SeekPosition::LastAcknowledged)
479 .await
480 .unwrap();
481
482 let events: Vec<_> = stream.take(2).collect().await;
483 assert_eq!(events.len(), 2);
484 assert_eq!(events[0].event.entity_type(), Some("first"));
485 }
486
487 #[tokio::test]
488 async fn test_clone_shares_state() {
489 let log1 = InMemoryEventLog::new();
490 let log2 = log1.clone();
491
492 log1.append(make_entity_event("from_log1")).await.unwrap();
493 log2.append(make_entity_event("from_log2")).await.unwrap();
494
495 assert_eq!(log1.last_seq_no().await, Some(2));
496 assert_eq!(log2.last_seq_no().await, Some(2));
497 }
498
499 #[tokio::test]
500 async fn test_seq_no_set_on_stored_envelopes() {
501 let log = InMemoryEventLog::new();
502
503 log.append(make_entity_event("user")).await.unwrap();
504 log.append(make_entity_event("order")).await.unwrap();
505 log.append(make_link_event("follows")).await.unwrap();
506
507 let stream = log
509 .subscribe("test-consumer", SeekPosition::Beginning)
510 .await
511 .unwrap();
512
513 let events: Vec<_> = stream.take(3).collect().await;
514 assert_eq!(events[0].seq_no, Some(1));
515 assert_eq!(events[1].seq_no, Some(2));
516 assert_eq!(events[2].seq_no, Some(3));
517
518 assert_eq!(events[0].event.entity_type(), Some("user"));
520 assert_eq!(events[1].event.entity_type(), Some("order"));
521 }
522
523 #[tokio::test]
524 async fn test_no_lost_wakeup_concurrent_producer_consumer() {
525 let log = InMemoryEventLog::new();
527 let event_count = 100;
528
529 let stream = log
531 .subscribe("stress-consumer", SeekPosition::Beginning)
532 .await
533 .unwrap();
534
535 let log_clone = log.clone();
537 tokio::spawn(async move {
538 for i in 0..event_count {
539 log_clone
540 .append(make_entity_event(&format!("stress_{i}")))
541 .await
542 .unwrap();
543 if i % 10 == 0 {
545 tokio::task::yield_now().await;
546 }
547 }
548 });
549
550 let events: Vec<_> = tokio::time::timeout(
552 std::time::Duration::from_secs(5),
553 stream.take(event_count).collect(),
554 )
555 .await
556 .expect("timed out waiting for events — possible lost wakeup");
557
558 assert_eq!(
559 events.len(),
560 event_count,
561 "lost {} events",
562 event_count - events.len()
563 );
564
565 for (i, event) in events.iter().enumerate() {
567 assert_eq!(
568 event.event.entity_type(),
569 Some(format!("stress_{i}").as_str()),
570 "event at index {i} has wrong type"
571 );
572 assert_eq!(event.seq_no, Some((i + 1) as u64));
573 }
574 }
575}