eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion,
12    StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16/// In-memory event store implementation for testing.
17///
18/// InMemoryEventStore provides a lightweight, zero-dependency storage backend
19/// for EventCore integration tests and development. It implements the EventStore
20/// trait using standard library collections (HashMap, BTreeMap) with optimistic
21/// concurrency control via version checking.
22///
23/// # Example
24///
25/// ```ignore
26/// use eventcore_memory::InMemoryEventStore;
27///
28/// let store = InMemoryEventStore::new();
29/// // Use store with execute() function
30/// ```
31///
32/// # Thread Safety
33///
34/// InMemoryEventStore uses interior mutability for concurrent access.
35type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
36
37/// Entry in the global event log with indexed stream_id for efficient filtering.
38///
39/// This structure mirrors the Postgres schema where stream_id is a separate
40/// indexed column and event_id (UUID7) serves as the global position.
41/// By storing stream_id and event_id separately, we can filter by stream
42/// prefix and position without parsing JSON, matching the performance
43/// characteristics of the database implementation.
44#[derive(Debug, Clone)]
45struct GlobalLogEntry {
46    /// Event identifier (UUID7), used as global position
47    event_id: Uuid,
48    /// Stream identifier, extracted at write time for efficient filtering
49    stream_id: String,
50    /// Event data as JSON value
51    event_data: serde_json::Value,
52}
53
54/// Internal storage combining per-stream data with global event ordering.
55struct StoreData {
56    streams: HashMap<StreamId, StreamData>,
57    /// Global log with indexed stream_id for efficient EventReader queries
58    global_log: Vec<GlobalLogEntry>,
59}
60
61pub struct InMemoryEventStore {
62    data: std::sync::Mutex<StoreData>,
63}
64
65impl InMemoryEventStore {
66    /// Create a new in-memory event store.
67    ///
68    /// Returns an empty event store ready for command execution.
69    /// All streams start at version 0 (no events).
70    pub fn new() -> Self {
71        Self {
72            data: std::sync::Mutex::new(StoreData {
73                streams: HashMap::new(),
74                global_log: Vec::new(),
75            }),
76        }
77    }
78}
79
80impl Default for InMemoryEventStore {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl EventStore for InMemoryEventStore {
87    async fn read_stream<E: Event>(
88        &self,
89        stream_id: StreamId,
90    ) -> Result<EventStreamReader<E>, EventStoreError> {
91        let data = self
92            .data
93            .lock()
94            .map_err(|_| EventStoreError::StoreFailure {
95                operation: Operation::ReadStream,
96            })?;
97        let events = data
98            .streams
99            .get(&stream_id)
100            .map(|(boxed_events, _version)| {
101                boxed_events
102                    .iter()
103                    .filter_map(|boxed| boxed.downcast_ref::<E>())
104                    .cloned()
105                    .collect()
106            })
107            .unwrap_or_default();
108
109        Ok(EventStreamReader::new(events))
110    }
111
112    async fn append_events(
113        &self,
114        writes: StreamWrites,
115    ) -> Result<EventStreamSlice, EventStoreError> {
116        let mut data = self
117            .data
118            .lock()
119            .map_err(|_| EventStoreError::StoreFailure {
120                operation: Operation::AppendEvents,
121            })?;
122        let expected_versions = writes.expected_versions().clone();
123
124        // Check all version constraints before writing any events
125        for (stream_id, expected_version) in &expected_versions {
126            let current_version = data
127                .streams
128                .get(stream_id)
129                .map(|(_events, version)| *version)
130                .unwrap_or_else(|| StreamVersion::new(0));
131
132            if current_version != *expected_version {
133                return Err(EventStoreError::VersionConflict);
134            }
135        }
136
137        // All versions match - proceed with writes
138        for entry in writes.into_entries() {
139            let StreamWriteEntry {
140                stream_id,
141                event,
142                event_type: _,
143                event_data,
144            } = entry;
145
146            // Generate UUID7 for this event (monotonic, timestamp-ordered)
147            let event_id = Uuid::now_v7();
148
149            // Store in global log for EventReader with indexed stream_id and event_id
150            data.global_log.push(GlobalLogEntry {
151                event_id,
152                stream_id: stream_id.as_ref().to_string(),
153                event_data,
154            });
155
156            let (events, version) = data
157                .streams
158                .entry(stream_id)
159                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
160            events.push(event);
161            *version = version.increment();
162        }
163
164        Ok(EventStreamSlice)
165    }
166}
167
168impl EventReader for InMemoryEventStore {
169    type Error = EventStoreError;
170
171    async fn read_events<E: Event>(
172        &self,
173        filter: EventFilter,
174        page: EventPage,
175    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
176        let data = self
177            .data
178            .lock()
179            .map_err(|_| EventStoreError::StoreFailure {
180                operation: Operation::ReadStream,
181            })?;
182
183        let after_event_id = page.after_position().map(|p| p.into_inner());
184
185        let events: Vec<(E, StreamPosition)> = data
186            .global_log
187            .iter()
188            .filter(|entry| {
189                // Filter by event_id (UUID7 comparison)
190                match after_event_id {
191                    None => true,
192                    Some(after_id) => entry.event_id > after_id,
193                }
194            })
195            .filter(|entry| {
196                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
197                match filter.stream_prefix() {
198                    None => true,
199                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
200                }
201            })
202            .take(page.limit().into_inner())
203            .filter_map(|entry| {
204                serde_json::from_value::<E>(entry.event_data.clone())
205                    .ok()
206                    .map(|e| (e, StreamPosition::new(entry.event_id)))
207            })
208            .collect();
209
210        Ok(events)
211    }
212}
213
214/// In-memory checkpoint store for tracking projection progress.
215///
216/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
217/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
218/// and single-process deployments where persistence across restarts is not required.
219///
220/// For production deployments requiring durability, use a persistent
221/// checkpoint store implementation.
222///
223/// # Example
224///
225/// ```ignore
226/// use eventcore_memory::InMemoryCheckpointStore;
227///
228/// let checkpoint_store = InMemoryCheckpointStore::new();
229/// // Use with ProjectionRunner
230/// ```
231#[derive(Debug, Clone, Default)]
232pub struct InMemoryCheckpointStore {
233    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
234}
235
236impl InMemoryCheckpointStore {
237    /// Create a new in-memory checkpoint store.
238    pub fn new() -> Self {
239        Self::default()
240    }
241}
242
243/// Error type for in-memory checkpoint store operations.
244///
245/// Since the in-memory store uses an `RwLock`, the only possible error
246/// is a poisoned lock from a panic in another thread.
247#[derive(Debug, Clone)]
248pub struct InMemoryCheckpointError {
249    message: String,
250}
251
252impl std::fmt::Display for InMemoryCheckpointError {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "{}", self.message)
255    }
256}
257
258impl std::error::Error for InMemoryCheckpointError {}
259
260impl CheckpointStore for InMemoryCheckpointStore {
261    type Error = InMemoryCheckpointError;
262
263    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
264        let guard = self
265            .checkpoints
266            .read()
267            .map_err(|e| InMemoryCheckpointError {
268                message: format!("failed to acquire read lock: {}", e),
269            })?;
270        Ok(guard.get(name).copied())
271    }
272
273    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
274        let mut guard = self
275            .checkpoints
276            .write()
277            .map_err(|e| InMemoryCheckpointError {
278                message: format!("failed to acquire write lock: {}", e),
279            })?;
280        let _ = guard.insert(name.to_string(), position);
281        Ok(())
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use eventcore_types::{BatchSize, EventFilter, EventPage};
289    use serde::{Deserialize, Serialize};
290
291    /// Test-specific domain event type for unit testing storage operations.
292    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
293    struct TestEvent {
294        stream_id: StreamId,
295        data: String,
296    }
297
298    impl Event for TestEvent {
299        fn stream_id(&self) -> &StreamId {
300            &self.stream_id
301        }
302    }
303
304    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
305    ///
306    /// This test verifies the fundamental event storage capability:
307    /// - Append an event to a stream
308    /// - Read the stream back
309    /// - Verify the event is retrievable with correct data
310    ///
311    /// This is a unit test drilling down from the failing integration test
312    /// test_deposit_command_event_data_is_retrievable. We're testing the
313    /// storage layer in isolation before testing the full command execution flow.
314    #[tokio::test]
315    async fn test_append_and_read_single_event() {
316        // Given: An in-memory event store
317        let store = InMemoryEventStore::new();
318
319        // And: A stream ID
320        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
321
322        // And: A domain event to store
323        let event = TestEvent {
324            stream_id: stream_id.clone(),
325            data: "test event data".to_string(),
326        };
327
328        // And: A collection of writes containing the event (expected version 0 for empty stream)
329        let writes = StreamWrites::new()
330            .register_stream(stream_id.clone(), StreamVersion::new(0))
331            .and_then(|writes| writes.append(event.clone()))
332            .expect("append should succeed");
333
334        // When: We append the event to the store
335        let _ = store
336            .append_events(writes)
337            .await
338            .expect("append to succeed");
339
340        let reader = store
341            .read_stream::<TestEvent>(stream_id)
342            .await
343            .expect("read to succeed");
344
345        let observed = (
346            reader.is_empty(),
347            reader.len(),
348            reader.iter().next().is_none(),
349        );
350
351        assert_eq!(observed, (false, 1usize, false));
352    }
353
354    #[tokio::test]
355    async fn event_stream_reader_is_empty_reflects_stream_population() {
356        let store = InMemoryEventStore::new();
357        let stream_id =
358            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
359
360        let initial_reader = store
361            .read_stream::<TestEvent>(stream_id.clone())
362            .await
363            .expect("initial read to succeed");
364
365        let event = TestEvent {
366            stream_id: stream_id.clone(),
367            data: "populated event".to_string(),
368        };
369
370        let writes = StreamWrites::new()
371            .register_stream(stream_id.clone(), StreamVersion::new(0))
372            .and_then(|writes| writes.append(event))
373            .expect("append should succeed");
374
375        let _ = store
376            .append_events(writes)
377            .await
378            .expect("append to succeed");
379
380        let populated_reader = store
381            .read_stream::<TestEvent>(stream_id)
382            .await
383            .expect("populated read to succeed");
384
385        let observed = (
386            initial_reader.is_empty(),
387            initial_reader.len(),
388            populated_reader.is_empty(),
389            populated_reader.len(),
390        );
391
392        assert_eq!(observed, (true, 0usize, false, 1usize));
393    }
394
395    #[tokio::test]
396    async fn read_stream_iterates_through_events_in_order() {
397        let store = InMemoryEventStore::new();
398        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
399
400        let first_event = TestEvent {
401            stream_id: stream_id.clone(),
402            data: "first".to_string(),
403        };
404
405        let second_event = TestEvent {
406            stream_id: stream_id.clone(),
407            data: "second".to_string(),
408        };
409
410        let writes = StreamWrites::new()
411            .register_stream(stream_id.clone(), StreamVersion::new(0))
412            .and_then(|writes| writes.append(first_event))
413            .and_then(|writes| writes.append(second_event))
414            .expect("append chain should succeed");
415
416        let _ = store
417            .append_events(writes)
418            .await
419            .expect("append to succeed");
420
421        let reader = store
422            .read_stream::<TestEvent>(stream_id)
423            .await
424            .expect("read to succeed");
425
426        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
427
428        let observed = (reader.is_empty(), collected);
429
430        assert_eq!(
431            observed,
432            (false, vec!["first".to_string(), "second".to_string()])
433        );
434    }
435
436    #[test]
437    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
438        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
439            .expect("valid stream id");
440
441        let first_event = TestEvent {
442            stream_id: stream_id.clone(),
443            data: "first-event".to_string(),
444        };
445
446        let second_event = TestEvent {
447            stream_id: stream_id.clone(),
448            data: "second-event".to_string(),
449        };
450
451        let writes_result = StreamWrites::new()
452            .register_stream(stream_id.clone(), StreamVersion::new(0))
453            .and_then(|writes| writes.append(first_event))
454            .and_then(|writes| writes.append(second_event));
455
456        assert!(writes_result.is_ok());
457    }
458
459    #[test]
460    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
461        let stream_id =
462            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
463
464        let first_event = TestEvent {
465            stream_id: stream_id.clone(),
466            data: "first-event-conflict".to_string(),
467        };
468
469        let second_event = TestEvent {
470            stream_id: stream_id.clone(),
471            data: "second-event-conflict".to_string(),
472        };
473
474        let conflict = StreamWrites::new()
475            .register_stream(stream_id.clone(), StreamVersion::new(0))
476            .and_then(|writes| writes.append(first_event))
477            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
478            .and_then(|writes| writes.append(second_event));
479
480        let message = conflict.unwrap_err().to_string();
481
482        assert_eq!(
483            message,
484            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
485        );
486    }
487
488    #[tokio::test]
489    async fn stream_writes_registers_stream_before_appending_multiple_events() {
490        let store = InMemoryEventStore::new();
491        let stream_id =
492            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
493
494        let first_event = TestEvent {
495            stream_id: stream_id.clone(),
496            data: "first-registered-event".to_string(),
497        };
498
499        let second_event = TestEvent {
500            stream_id: stream_id.clone(),
501            data: "second-registered-event".to_string(),
502        };
503
504        let writes = StreamWrites::new()
505            .register_stream(stream_id.clone(), StreamVersion::new(0))
506            .and_then(|writes| writes.append(first_event))
507            .and_then(|writes| writes.append(second_event))
508            .expect("registered stream should accept events");
509
510        let result = store.append_events(writes).await;
511
512        assert!(
513            result.is_ok(),
514            "append should succeed when stream registered before events"
515        );
516    }
517
518    #[test]
519    fn stream_writes_rejects_appends_for_unregistered_streams() {
520        let stream_id =
521            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
522
523        let event = TestEvent {
524            stream_id: stream_id.clone(),
525            data: "unregistered-event".to_string(),
526        };
527
528        let error = StreamWrites::new()
529            .append(event)
530            .expect_err("append without prior registration should fail");
531
532        assert!(matches!(
533            error,
534            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
535        ));
536    }
537
538    #[test]
539    fn expected_versions_returns_registered_streams_and_versions() {
540        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
541        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
542
543        let writes = StreamWrites::new()
544            .register_stream(stream_a.clone(), StreamVersion::new(0))
545            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
546            .expect("registration should succeed");
547
548        let versions = writes.expected_versions();
549
550        assert_eq!(versions.len(), 2);
551        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
552        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
553    }
554
555    #[test]
556    fn stream_id_rejects_asterisk_metacharacter() {
557        let result = StreamId::try_new("account-*");
558        assert!(
559            result.is_err(),
560            "StreamId should reject asterisk glob metacharacter"
561        );
562    }
563
564    #[test]
565    fn stream_id_rejects_question_mark_metacharacter() {
566        let result = StreamId::try_new("account-?");
567        assert!(
568            result.is_err(),
569            "StreamId should reject question mark glob metacharacter"
570        );
571    }
572
573    #[test]
574    fn stream_id_rejects_open_bracket_metacharacter() {
575        let result = StreamId::try_new("account-[");
576        assert!(
577            result.is_err(),
578            "StreamId should reject open bracket glob metacharacter"
579        );
580    }
581
582    #[test]
583    fn stream_id_rejects_close_bracket_metacharacter() {
584        let result = StreamId::try_new("account-]");
585        assert!(
586            result.is_err(),
587            "StreamId should reject close bracket glob metacharacter"
588        );
589    }
590
591    #[tokio::test]
592    async fn event_reader_after_position_excludes_event_at_position() {
593        // Given: An event store with 3 events
594        let store = InMemoryEventStore::new();
595        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
596
597        let event1 = TestEvent {
598            stream_id: stream_id.clone(),
599            data: "first".to_string(),
600        };
601        let event2 = TestEvent {
602            stream_id: stream_id.clone(),
603            data: "second".to_string(),
604        };
605        let event3 = TestEvent {
606            stream_id: stream_id.clone(),
607            data: "third".to_string(),
608        };
609
610        let writes = StreamWrites::new()
611            .register_stream(stream_id.clone(), StreamVersion::new(0))
612            .and_then(|w| w.append(event1))
613            .and_then(|w| w.append(event2))
614            .and_then(|w| w.append(event3))
615            .expect("append should succeed");
616
617        store
618            .append_events(writes)
619            .await
620            .expect("append to succeed");
621
622        // First, read all events to get their positions
623        let all_events = store
624            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
625            .await
626            .expect("read all events to succeed");
627
628        assert_eq!(all_events.len(), 3, "Should have 3 events total");
629        let (first_event, first_position) = &all_events[0];
630
631        // When: We read events after the first event's position
632        let page = EventPage::after(*first_position, BatchSize::new(100));
633        let filter = EventFilter::all();
634        let events = store
635            .read_events::<TestEvent>(filter, page)
636            .await
637            .expect("read to succeed");
638
639        // Then: We should get 2 events (event2 and event3), not including event1
640        assert_eq!(events.len(), 2, "Should get 2 events after first position");
641        assert_eq!(
642            events[0].0.data, "second",
643            "First returned event should be 'second'"
644        );
645        assert_eq!(
646            events[1].0.data, "third",
647            "Second returned event should be 'third'"
648        );
649
650        // And: The first event should NOT be in the results
651        for (event, _pos) in &events {
652            assert_ne!(
653                event.data, first_event.data,
654                "First event should be excluded"
655            );
656        }
657
658        // And: All returned positions should be greater than first_position
659        for (_event, pos) in &events {
660            assert!(
661                *pos > *first_position,
662                "Returned position {} should be > first position {}",
663                pos,
664                first_position
665            );
666        }
667    }
668}