Skip to main content

this/events/
memory.rs

1//! In-memory EventLog implementation
2//!
3//! Vec-backed event log suitable for development and single-instance deployments.
4//! Events are stored in memory and lost on restart.
5
6use crate::core::events::EventEnvelope;
7use crate::events::log::EventLog;
8use crate::events::types::{SeekPosition, SeqNo};
9use anyhow::Result;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio::sync::{Notify, RwLock};
15use tokio_stream::Stream;
16
17/// In-memory implementation of the EventLog trait
18///
19/// Uses a Vec for storage and a Notify for waking subscribers
20/// when new events are appended. Thread-safe via `Arc<RwLock>`.
21///
22/// # Performance
23///
24/// - Append: O(1) amortized
25/// - Subscribe replay: O(n) from start position
26/// - Ack/Seek: O(1)
27///
28/// # Limitations
29///
30/// - Events are lost on restart (no persistence)
31/// - Memory grows unbounded (no retention policy yet)
32/// - Single-instance only (no cross-process sharing)
33#[derive(Debug, Clone)]
34pub struct InMemoryEventLog {
35    inner: Arc<InMemoryEventLogInner>,
36}
37
38#[derive(Debug)]
39struct InMemoryEventLogInner {
40    /// Ordered list of events (index = seq_no - 1)
41    events: RwLock<Vec<EventEnvelope>>,
42    /// Consumer positions: consumer_name -> last acked seq_no
43    positions: RwLock<HashMap<String, SeqNo>>,
44    /// Notification channel for new events
45    notify: Notify,
46}
47
48impl InMemoryEventLog {
49    /// Create a new empty in-memory event log
50    pub fn new() -> Self {
51        Self {
52            inner: Arc::new(InMemoryEventLogInner {
53                events: RwLock::new(Vec::new()),
54                positions: RwLock::new(HashMap::new()),
55                notify: Notify::new(),
56            }),
57        }
58    }
59}
60
61impl Default for InMemoryEventLog {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67#[async_trait]
68impl EventLog for InMemoryEventLog {
69    async fn append(&self, mut envelope: EventEnvelope) -> Result<SeqNo> {
70        let seq_no;
71        {
72            let mut events = self.inner.events.write().await;
73            seq_no = (events.len() + 1) as SeqNo;
74            envelope.seq_no = Some(seq_no);
75            events.push(envelope);
76        }
77        // Wake all waiting subscribers
78        self.inner.notify.notify_waiters();
79        Ok(seq_no)
80    }
81
82    async fn subscribe(
83        &self,
84        consumer: &str,
85        position: SeekPosition,
86    ) -> Result<Pin<Box<dyn Stream<Item = EventEnvelope> + Send>>> {
87        let start_seq = match position {
88            SeekPosition::Beginning => 0,
89            SeekPosition::Latest => {
90                let events = self.inner.events.read().await;
91                events.len() as SeqNo
92            }
93            SeekPosition::Sequence(seq) => seq.saturating_sub(1), // seq_no is 1-based, index is 0-based
94            SeekPosition::LastAcknowledged => {
95                let positions = self.inner.positions.read().await;
96                positions.get(consumer).copied().unwrap_or(0)
97            }
98        };
99
100        let inner = self.inner.clone();
101
102        // Use futures::stream::unfold to properly handle the Notified lifetime.
103        // This avoids the race condition where a stack-allocated Notified is dropped
104        // after poll_next returns Pending, causing lost wakeups.
105        let stream =
106            futures::stream::unfold((inner, start_seq), |(inner, mut cursor)| async move {
107                loop {
108                    // Check for available events.
109                    // The read guard is scoped so it's dropped before we move `inner`.
110                    let maybe_event = {
111                        let events = inner.events.read().await;
112                        let c = cursor as usize;
113                        if c < events.len() {
114                            Some(events[c].clone())
115                        } else {
116                            None
117                        }
118                    }; // RwLockReadGuard dropped here
119
120                    if let Some(event) = maybe_event {
121                        cursor += 1;
122                        return Some((event, (inner, cursor)));
123                    }
124
125                    // No event available, wait for notification.
126                    // The Notified future is properly held alive by unfold's
127                    // internal state machine across poll calls.
128                    inner.notify.notified().await;
129                }
130            });
131
132        Ok(Box::pin(stream))
133    }
134
135    async fn ack(&self, consumer: &str, seq_no: SeqNo) -> Result<()> {
136        let mut positions = self.inner.positions.write().await;
137        positions.insert(consumer.to_string(), seq_no);
138        Ok(())
139    }
140
141    async fn seek(&self, consumer: &str, position: SeekPosition) -> Result<()> {
142        let seq_no = match position {
143            SeekPosition::Beginning => 0,
144            SeekPosition::Latest => {
145                let events = self.inner.events.read().await;
146                events.len() as SeqNo
147            }
148            SeekPosition::Sequence(seq) => seq,
149            SeekPosition::LastAcknowledged => {
150                // No-op: already at LastAcknowledged
151                return Ok(());
152            }
153        };
154        let mut positions = self.inner.positions.write().await;
155        positions.insert(consumer.to_string(), seq_no);
156        Ok(())
157    }
158
159    async fn last_seq_no(&self) -> Option<SeqNo> {
160        let events = self.inner.events.read().await;
161        if events.is_empty() {
162            None
163        } else {
164            Some(events.len() as SeqNo)
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::core::events::{EntityEvent, EventEnvelope, FrameworkEvent, LinkEvent};
173    use serde_json::json;
174    use tokio_stream::StreamExt;
175    use uuid::Uuid;
176
177    fn make_entity_event(entity_type: &str) -> EventEnvelope {
178        EventEnvelope::new(FrameworkEvent::Entity(EntityEvent::Created {
179            entity_type: entity_type.to_string(),
180            entity_id: Uuid::new_v4(),
181            data: json!({"name": "test"}),
182        }))
183    }
184
185    fn make_link_event(link_type: &str) -> EventEnvelope {
186        EventEnvelope::new(FrameworkEvent::Link(LinkEvent::Created {
187            link_type: link_type.to_string(),
188            link_id: Uuid::new_v4(),
189            source_id: Uuid::new_v4(),
190            target_id: Uuid::new_v4(),
191            metadata: None,
192        }))
193    }
194
195    #[tokio::test]
196    async fn test_append_returns_sequential_ids() {
197        let log = InMemoryEventLog::new();
198
199        let seq1 = log.append(make_entity_event("user")).await.unwrap();
200        let seq2 = log.append(make_entity_event("order")).await.unwrap();
201        let seq3 = log.append(make_link_event("follows")).await.unwrap();
202
203        assert_eq!(seq1, 1);
204        assert_eq!(seq2, 2);
205        assert_eq!(seq3, 3);
206    }
207
208    #[tokio::test]
209    async fn test_last_seq_no_empty() {
210        let log = InMemoryEventLog::new();
211        assert_eq!(log.last_seq_no().await, None);
212    }
213
214    #[tokio::test]
215    async fn test_last_seq_no_after_appends() {
216        let log = InMemoryEventLog::new();
217        log.append(make_entity_event("user")).await.unwrap();
218        log.append(make_entity_event("order")).await.unwrap();
219        assert_eq!(log.last_seq_no().await, Some(2));
220    }
221
222    #[tokio::test]
223    async fn test_subscribe_from_beginning() {
224        let log = InMemoryEventLog::new();
225
226        // Append 5 events
227        for i in 0..5 {
228            log.append(make_entity_event(&format!("type_{i}")))
229                .await
230                .unwrap();
231        }
232
233        // Subscribe from beginning
234        let stream = log
235            .subscribe("test-consumer", SeekPosition::Beginning)
236            .await
237            .unwrap();
238
239        // Take exactly 5 events (the stored ones)
240        let events: Vec<_> = stream.take(5).collect().await;
241        assert_eq!(events.len(), 5);
242
243        // Verify order
244        assert_eq!(events[0].event.entity_type(), Some("type_0"));
245        assert_eq!(events[4].event.entity_type(), Some("type_4"));
246    }
247
248    #[tokio::test]
249    async fn test_subscribe_from_latest_only_gets_new() {
250        let log = InMemoryEventLog::new();
251
252        // Append some events before subscribing
253        log.append(make_entity_event("old_event")).await.unwrap();
254        log.append(make_entity_event("old_event_2")).await.unwrap();
255
256        // Subscribe from latest
257        let mut stream = log
258            .subscribe("test-consumer", SeekPosition::Latest)
259            .await
260            .unwrap();
261
262        // Append a new event
263        let log_clone = log.clone();
264        tokio::spawn(async move {
265            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
266            log_clone
267                .append(make_entity_event("new_event"))
268                .await
269                .unwrap();
270        });
271
272        // Should receive only the new event
273        let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
274            .await
275            .unwrap()
276            .unwrap();
277        assert_eq!(event.event.entity_type(), Some("new_event"));
278    }
279
280    #[tokio::test]
281    async fn test_subscribe_from_sequence() {
282        let log = InMemoryEventLog::new();
283
284        // Append 5 events
285        for i in 0..5 {
286            log.append(make_entity_event(&format!("type_{i}")))
287                .await
288                .unwrap();
289        }
290
291        // Subscribe from sequence 3 (0-based internally, so we get events 3, 4, 5)
292        let stream = log
293            .subscribe("test-consumer", SeekPosition::Sequence(3))
294            .await
295            .unwrap();
296
297        let events: Vec<_> = stream.take(3).collect().await;
298        assert_eq!(events.len(), 3);
299        assert_eq!(events[0].event.entity_type(), Some("type_2")); // seq 3 = index 2
300    }
301
302    #[tokio::test]
303    async fn test_ack_advances_position() {
304        let log = InMemoryEventLog::new();
305
306        // Append 5 events
307        for i in 0..5 {
308            log.append(make_entity_event(&format!("type_{i}")))
309                .await
310                .unwrap();
311        }
312
313        // Ack up to seq 3
314        log.ack("consumer-a", 3).await.unwrap();
315
316        // Subscribe from LastAcknowledged
317        let stream = log
318            .subscribe("consumer-a", SeekPosition::LastAcknowledged)
319            .await
320            .unwrap();
321
322        let events: Vec<_> = stream.take(2).collect().await;
323        assert_eq!(events.len(), 2);
324        assert_eq!(events[0].event.entity_type(), Some("type_3")); // After ack(3), next is index 3 = type_3
325    }
326
327    #[tokio::test]
328    async fn test_seek_repositions_consumer() {
329        let log = InMemoryEventLog::new();
330
331        // Append 5 events
332        for i in 0..5 {
333            log.append(make_entity_event(&format!("type_{i}")))
334                .await
335                .unwrap();
336        }
337
338        // Ack up to 5 (all events)
339        log.ack("consumer-b", 5).await.unwrap();
340
341        // Seek back to beginning
342        log.seek("consumer-b", SeekPosition::Beginning)
343            .await
344            .unwrap();
345
346        // Subscribe from LastAcknowledged should now give all events
347        let stream = log
348            .subscribe("consumer-b", SeekPosition::LastAcknowledged)
349            .await
350            .unwrap();
351
352        let events: Vec<_> = stream.take(5).collect().await;
353        assert_eq!(events.len(), 5);
354        assert_eq!(events[0].event.entity_type(), Some("type_0"));
355    }
356
357    #[tokio::test]
358    async fn test_multiple_consumers_independent_positions() {
359        let log = InMemoryEventLog::new();
360
361        // Append 5 events
362        for i in 0..5 {
363            log.append(make_entity_event(&format!("type_{i}")))
364                .await
365                .unwrap();
366        }
367
368        // Consumer A acks up to 2
369        log.ack("consumer-a", 2).await.unwrap();
370        // Consumer B acks up to 4
371        log.ack("consumer-b", 4).await.unwrap();
372
373        // Consumer A from LastAcknowledged
374        let stream_a = log
375            .subscribe("consumer-a", SeekPosition::LastAcknowledged)
376            .await
377            .unwrap();
378        let events_a: Vec<_> = stream_a.take(3).collect().await;
379        assert_eq!(events_a.len(), 3); // Events 3, 4, 5 (indices 2, 3, 4)
380
381        // Consumer B from LastAcknowledged
382        let stream_b = log
383            .subscribe("consumer-b", SeekPosition::LastAcknowledged)
384            .await
385            .unwrap();
386        let events_b: Vec<_> = stream_b.take(1).collect().await;
387        assert_eq!(events_b.len(), 1); // Only event 5 (index 4)
388    }
389
390    #[tokio::test]
391    async fn test_live_subscription_receives_new_events() {
392        let log = InMemoryEventLog::new();
393
394        let mut stream = log
395            .subscribe("live-consumer", SeekPosition::Latest)
396            .await
397            .unwrap();
398
399        // Spawn a producer
400        let log_clone = log.clone();
401        tokio::spawn(async move {
402            for i in 0..3 {
403                tokio::time::sleep(std::time::Duration::from_millis(30)).await;
404                log_clone
405                    .append(make_entity_event(&format!("live_{i}")))
406                    .await
407                    .unwrap();
408            }
409        });
410
411        // Consume 3 live events
412        for i in 0..3 {
413            let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
414                .await
415                .unwrap()
416                .unwrap();
417            assert_eq!(
418                event.event.entity_type(),
419                Some(format!("live_{i}").as_str())
420            );
421        }
422    }
423
424    #[tokio::test]
425    async fn test_replay_then_live() {
426        let log = InMemoryEventLog::new();
427
428        // Pre-populate with 3 events
429        for i in 0..3 {
430            log.append(make_entity_event(&format!("old_{i}")))
431                .await
432                .unwrap();
433        }
434
435        // Subscribe from beginning (will replay first, then go live)
436        let mut stream = log
437            .subscribe("replay-consumer", SeekPosition::Beginning)
438            .await
439            .unwrap();
440
441        // Read the 3 replayed events
442        for i in 0..3 {
443            let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
444                .await
445                .unwrap()
446                .unwrap();
447            assert_eq!(event.event.entity_type(), Some(format!("old_{i}").as_str()));
448        }
449
450        // Now append a live event
451        let log_clone = log.clone();
452        tokio::spawn(async move {
453            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
454            log_clone
455                .append(make_entity_event("live_new"))
456                .await
457                .unwrap();
458        });
459
460        // Should receive the live event
461        let event = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
462            .await
463            .unwrap()
464            .unwrap();
465        assert_eq!(event.event.entity_type(), Some("live_new"));
466    }
467
468    #[tokio::test]
469    async fn test_unacked_consumer_starts_from_zero() {
470        let log = InMemoryEventLog::new();
471
472        // Append events
473        log.append(make_entity_event("first")).await.unwrap();
474        log.append(make_entity_event("second")).await.unwrap();
475
476        // New consumer (never acked) subscribing from LastAcknowledged
477        let stream = log
478            .subscribe("new-consumer", SeekPosition::LastAcknowledged)
479            .await
480            .unwrap();
481
482        let events: Vec<_> = stream.take(2).collect().await;
483        assert_eq!(events.len(), 2);
484        assert_eq!(events[0].event.entity_type(), Some("first"));
485    }
486
487    #[tokio::test]
488    async fn test_clone_shares_state() {
489        let log1 = InMemoryEventLog::new();
490        let log2 = log1.clone();
491
492        log1.append(make_entity_event("from_log1")).await.unwrap();
493        log2.append(make_entity_event("from_log2")).await.unwrap();
494
495        assert_eq!(log1.last_seq_no().await, Some(2));
496        assert_eq!(log2.last_seq_no().await, Some(2));
497    }
498
499    #[tokio::test]
500    async fn test_seq_no_set_on_stored_envelopes() {
501        let log = InMemoryEventLog::new();
502
503        log.append(make_entity_event("user")).await.unwrap();
504        log.append(make_entity_event("order")).await.unwrap();
505        log.append(make_link_event("follows")).await.unwrap();
506
507        // Subscribe from beginning and verify seq_no is set on each envelope
508        let stream = log
509            .subscribe("test-consumer", SeekPosition::Beginning)
510            .await
511            .unwrap();
512
513        let events: Vec<_> = stream.take(3).collect().await;
514        assert_eq!(events[0].seq_no, Some(1));
515        assert_eq!(events[1].seq_no, Some(2));
516        assert_eq!(events[2].seq_no, Some(3));
517
518        // Verify the event data is also correct
519        assert_eq!(events[0].event.entity_type(), Some("user"));
520        assert_eq!(events[1].event.entity_type(), Some("order"));
521    }
522
523    #[tokio::test]
524    async fn test_no_lost_wakeup_concurrent_producer_consumer() {
525        // Stress test: fast producer + consumer, verify no events lost
526        let log = InMemoryEventLog::new();
527        let event_count = 100;
528
529        // Subscribe BEFORE producing (from beginning)
530        let stream = log
531            .subscribe("stress-consumer", SeekPosition::Beginning)
532            .await
533            .unwrap();
534
535        // Spawn a fast producer with minimal delay
536        let log_clone = log.clone();
537        tokio::spawn(async move {
538            for i in 0..event_count {
539                log_clone
540                    .append(make_entity_event(&format!("stress_{i}")))
541                    .await
542                    .unwrap();
543                // Yield occasionally to interleave with consumer
544                if i % 10 == 0 {
545                    tokio::task::yield_now().await;
546                }
547            }
548        });
549
550        // Consume all events with a timeout
551        let events: Vec<_> = tokio::time::timeout(
552            std::time::Duration::from_secs(5),
553            stream.take(event_count).collect(),
554        )
555        .await
556        .expect("timed out waiting for events — possible lost wakeup");
557
558        assert_eq!(
559            events.len(),
560            event_count,
561            "lost {} events",
562            event_count - events.len()
563        );
564
565        // Verify sequential order and seq_no
566        for (i, event) in events.iter().enumerate() {
567            assert_eq!(
568                event.event.entity_type(),
569                Some(format!("stress_{i}").as_str()),
570                "event at index {i} has wrong type"
571            );
572            assert_eq!(event.seq_no, Some((i + 1) as u64));
573        }
574    }
575}