eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7
8use eventcore_types::{
9    Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader,
10    EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion, StreamWriteEntry,
11    StreamWrites,
12};
13use uuid::Uuid;
14
15/// In-memory event store implementation for testing.
16///
17/// InMemoryEventStore provides a lightweight, zero-dependency storage backend
18/// for EventCore integration tests and development. It implements the EventStore
19/// trait using standard library collections (HashMap, BTreeMap) with optimistic
20/// concurrency control via version checking.
21///
22/// # Example
23///
24/// ```ignore
25/// use eventcore_memory::InMemoryEventStore;
26///
27/// let store = InMemoryEventStore::new();
28/// // Use store with execute() function
29/// ```
30///
31/// # Thread Safety
32///
33/// InMemoryEventStore uses interior mutability for concurrent access.
34type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
35
36/// Entry in the global event log with indexed stream_id for efficient filtering.
37///
38/// This structure mirrors the Postgres schema where stream_id is a separate
39/// indexed column and event_id (UUID7) serves as the global position.
40/// By storing stream_id and event_id separately, we can filter by stream
41/// prefix and position without parsing JSON, matching the performance
42/// characteristics of the database implementation.
43#[derive(Debug, Clone)]
44struct GlobalLogEntry {
45    /// Event identifier (UUID7), used as global position
46    event_id: Uuid,
47    /// Stream identifier, extracted at write time for efficient filtering
48    stream_id: String,
49    /// Event data as JSON value
50    event_data: serde_json::Value,
51}
52
53/// Internal storage combining per-stream data with global event ordering.
54struct StoreData {
55    streams: HashMap<StreamId, StreamData>,
56    /// Global log with indexed stream_id for efficient EventReader queries
57    global_log: Vec<GlobalLogEntry>,
58}
59
60pub struct InMemoryEventStore {
61    data: std::sync::Mutex<StoreData>,
62}
63
64impl InMemoryEventStore {
65    /// Create a new in-memory event store.
66    ///
67    /// Returns an empty event store ready for command execution.
68    /// All streams start at version 0 (no events).
69    pub fn new() -> Self {
70        Self {
71            data: std::sync::Mutex::new(StoreData {
72                streams: HashMap::new(),
73                global_log: Vec::new(),
74            }),
75        }
76    }
77}
78
79impl Default for InMemoryEventStore {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl EventStore for InMemoryEventStore {
86    async fn read_stream<E: Event>(
87        &self,
88        stream_id: StreamId,
89    ) -> Result<EventStreamReader<E>, EventStoreError> {
90        let data = self
91            .data
92            .lock()
93            .map_err(|_| EventStoreError::StoreFailure {
94                operation: Operation::ReadStream,
95            })?;
96        let events = data
97            .streams
98            .get(&stream_id)
99            .map(|(boxed_events, _version)| {
100                boxed_events
101                    .iter()
102                    .filter_map(|boxed| boxed.downcast_ref::<E>())
103                    .cloned()
104                    .collect()
105            })
106            .unwrap_or_default();
107
108        Ok(EventStreamReader::new(events))
109    }
110
111    async fn append_events(
112        &self,
113        writes: StreamWrites,
114    ) -> Result<EventStreamSlice, EventStoreError> {
115        let mut data = self
116            .data
117            .lock()
118            .map_err(|_| EventStoreError::StoreFailure {
119                operation: Operation::AppendEvents,
120            })?;
121        let expected_versions = writes.expected_versions().clone();
122
123        // Check all version constraints before writing any events
124        for (stream_id, expected_version) in &expected_versions {
125            let current_version = data
126                .streams
127                .get(stream_id)
128                .map(|(_events, version)| *version)
129                .unwrap_or_else(|| StreamVersion::new(0));
130
131            if current_version != *expected_version {
132                return Err(EventStoreError::VersionConflict);
133            }
134        }
135
136        // All versions match - proceed with writes
137        for entry in writes.into_entries() {
138            let StreamWriteEntry {
139                stream_id,
140                event,
141                event_type: _,
142                event_data,
143            } = entry;
144
145            // Generate UUID7 for this event (monotonic, timestamp-ordered)
146            let event_id = Uuid::now_v7();
147
148            // Store in global log for EventReader with indexed stream_id and event_id
149            data.global_log.push(GlobalLogEntry {
150                event_id,
151                stream_id: stream_id.as_ref().to_string(),
152                event_data,
153            });
154
155            let (events, version) = data
156                .streams
157                .entry(stream_id)
158                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
159            events.push(event);
160            *version = version.increment();
161        }
162
163        Ok(EventStreamSlice)
164    }
165}
166
167impl EventReader for InMemoryEventStore {
168    type Error = EventStoreError;
169
170    async fn read_events<E: Event>(
171        &self,
172        filter: EventFilter,
173        page: EventPage,
174    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
175        let data = self
176            .data
177            .lock()
178            .map_err(|_| EventStoreError::StoreFailure {
179                operation: Operation::ReadStream,
180            })?;
181
182        let after_event_id = page.after_position().map(|p| p.into_inner());
183
184        let events: Vec<(E, StreamPosition)> = data
185            .global_log
186            .iter()
187            .filter(|entry| {
188                // Filter by event_id (UUID7 comparison)
189                match after_event_id {
190                    None => true,
191                    Some(after_id) => entry.event_id > after_id,
192                }
193            })
194            .filter(|entry| {
195                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
196                match filter.stream_prefix() {
197                    None => true,
198                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
199                }
200            })
201            .take(page.limit().into_inner())
202            .filter_map(|entry| {
203                serde_json::from_value::<E>(entry.event_data.clone())
204                    .ok()
205                    .map(|e| (e, StreamPosition::new(entry.event_id)))
206            })
207            .collect();
208
209        Ok(events)
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use eventcore_types::{BatchSize, EventFilter, EventPage};
217    use serde::{Deserialize, Serialize};
218
219    /// Test-specific domain event type for unit testing storage operations.
220    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
221    struct TestEvent {
222        stream_id: StreamId,
223        data: String,
224    }
225
226    impl Event for TestEvent {
227        fn stream_id(&self) -> &StreamId {
228            &self.stream_id
229        }
230    }
231
232    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
233    ///
234    /// This test verifies the fundamental event storage capability:
235    /// - Append an event to a stream
236    /// - Read the stream back
237    /// - Verify the event is retrievable with correct data
238    ///
239    /// This is a unit test drilling down from the failing integration test
240    /// test_deposit_command_event_data_is_retrievable. We're testing the
241    /// storage layer in isolation before testing the full command execution flow.
242    #[tokio::test]
243    async fn test_append_and_read_single_event() {
244        // Given: An in-memory event store
245        let store = InMemoryEventStore::new();
246
247        // And: A stream ID
248        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
249
250        // And: A domain event to store
251        let event = TestEvent {
252            stream_id: stream_id.clone(),
253            data: "test event data".to_string(),
254        };
255
256        // And: A collection of writes containing the event (expected version 0 for empty stream)
257        let writes = StreamWrites::new()
258            .register_stream(stream_id.clone(), StreamVersion::new(0))
259            .and_then(|writes| writes.append(event.clone()))
260            .expect("append should succeed");
261
262        // When: We append the event to the store
263        let _ = store
264            .append_events(writes)
265            .await
266            .expect("append to succeed");
267
268        let reader = store
269            .read_stream::<TestEvent>(stream_id)
270            .await
271            .expect("read to succeed");
272
273        let observed = (
274            reader.is_empty(),
275            reader.len(),
276            reader.iter().next().is_none(),
277        );
278
279        assert_eq!(observed, (false, 1usize, false));
280    }
281
282    #[tokio::test]
283    async fn event_stream_reader_is_empty_reflects_stream_population() {
284        let store = InMemoryEventStore::new();
285        let stream_id =
286            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
287
288        let initial_reader = store
289            .read_stream::<TestEvent>(stream_id.clone())
290            .await
291            .expect("initial read to succeed");
292
293        let event = TestEvent {
294            stream_id: stream_id.clone(),
295            data: "populated event".to_string(),
296        };
297
298        let writes = StreamWrites::new()
299            .register_stream(stream_id.clone(), StreamVersion::new(0))
300            .and_then(|writes| writes.append(event))
301            .expect("append should succeed");
302
303        let _ = store
304            .append_events(writes)
305            .await
306            .expect("append to succeed");
307
308        let populated_reader = store
309            .read_stream::<TestEvent>(stream_id)
310            .await
311            .expect("populated read to succeed");
312
313        let observed = (
314            initial_reader.is_empty(),
315            initial_reader.len(),
316            populated_reader.is_empty(),
317            populated_reader.len(),
318        );
319
320        assert_eq!(observed, (true, 0usize, false, 1usize));
321    }
322
323    #[tokio::test]
324    async fn read_stream_iterates_through_events_in_order() {
325        let store = InMemoryEventStore::new();
326        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
327
328        let first_event = TestEvent {
329            stream_id: stream_id.clone(),
330            data: "first".to_string(),
331        };
332
333        let second_event = TestEvent {
334            stream_id: stream_id.clone(),
335            data: "second".to_string(),
336        };
337
338        let writes = StreamWrites::new()
339            .register_stream(stream_id.clone(), StreamVersion::new(0))
340            .and_then(|writes| writes.append(first_event))
341            .and_then(|writes| writes.append(second_event))
342            .expect("append chain should succeed");
343
344        let _ = store
345            .append_events(writes)
346            .await
347            .expect("append to succeed");
348
349        let reader = store
350            .read_stream::<TestEvent>(stream_id)
351            .await
352            .expect("read to succeed");
353
354        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
355
356        let observed = (reader.is_empty(), collected);
357
358        assert_eq!(
359            observed,
360            (false, vec!["first".to_string(), "second".to_string()])
361        );
362    }
363
364    #[test]
365    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
366        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
367            .expect("valid stream id");
368
369        let first_event = TestEvent {
370            stream_id: stream_id.clone(),
371            data: "first-event".to_string(),
372        };
373
374        let second_event = TestEvent {
375            stream_id: stream_id.clone(),
376            data: "second-event".to_string(),
377        };
378
379        let writes_result = StreamWrites::new()
380            .register_stream(stream_id.clone(), StreamVersion::new(0))
381            .and_then(|writes| writes.append(first_event))
382            .and_then(|writes| writes.append(second_event));
383
384        assert!(writes_result.is_ok());
385    }
386
387    #[test]
388    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
389        let stream_id =
390            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
391
392        let first_event = TestEvent {
393            stream_id: stream_id.clone(),
394            data: "first-event-conflict".to_string(),
395        };
396
397        let second_event = TestEvent {
398            stream_id: stream_id.clone(),
399            data: "second-event-conflict".to_string(),
400        };
401
402        let conflict = StreamWrites::new()
403            .register_stream(stream_id.clone(), StreamVersion::new(0))
404            .and_then(|writes| writes.append(first_event))
405            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
406            .and_then(|writes| writes.append(second_event));
407
408        let message = conflict.unwrap_err().to_string();
409
410        assert_eq!(
411            message,
412            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
413        );
414    }
415
416    #[tokio::test]
417    async fn stream_writes_registers_stream_before_appending_multiple_events() {
418        let store = InMemoryEventStore::new();
419        let stream_id =
420            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
421
422        let first_event = TestEvent {
423            stream_id: stream_id.clone(),
424            data: "first-registered-event".to_string(),
425        };
426
427        let second_event = TestEvent {
428            stream_id: stream_id.clone(),
429            data: "second-registered-event".to_string(),
430        };
431
432        let writes = StreamWrites::new()
433            .register_stream(stream_id.clone(), StreamVersion::new(0))
434            .and_then(|writes| writes.append(first_event))
435            .and_then(|writes| writes.append(second_event))
436            .expect("registered stream should accept events");
437
438        let result = store.append_events(writes).await;
439
440        assert!(
441            result.is_ok(),
442            "append should succeed when stream registered before events"
443        );
444    }
445
446    #[test]
447    fn stream_writes_rejects_appends_for_unregistered_streams() {
448        let stream_id =
449            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
450
451        let event = TestEvent {
452            stream_id: stream_id.clone(),
453            data: "unregistered-event".to_string(),
454        };
455
456        let error = StreamWrites::new()
457            .append(event)
458            .expect_err("append without prior registration should fail");
459
460        assert!(matches!(
461            error,
462            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
463        ));
464    }
465
466    #[test]
467    fn expected_versions_returns_registered_streams_and_versions() {
468        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
469        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
470
471        let writes = StreamWrites::new()
472            .register_stream(stream_a.clone(), StreamVersion::new(0))
473            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
474            .expect("registration should succeed");
475
476        let versions = writes.expected_versions();
477
478        assert_eq!(versions.len(), 2);
479        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
480        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
481    }
482
483    #[test]
484    fn stream_id_rejects_asterisk_metacharacter() {
485        let result = StreamId::try_new("account-*");
486        assert!(
487            result.is_err(),
488            "StreamId should reject asterisk glob metacharacter"
489        );
490    }
491
492    #[test]
493    fn stream_id_rejects_question_mark_metacharacter() {
494        let result = StreamId::try_new("account-?");
495        assert!(
496            result.is_err(),
497            "StreamId should reject question mark glob metacharacter"
498        );
499    }
500
501    #[test]
502    fn stream_id_rejects_open_bracket_metacharacter() {
503        let result = StreamId::try_new("account-[");
504        assert!(
505            result.is_err(),
506            "StreamId should reject open bracket glob metacharacter"
507        );
508    }
509
510    #[test]
511    fn stream_id_rejects_close_bracket_metacharacter() {
512        let result = StreamId::try_new("account-]");
513        assert!(
514            result.is_err(),
515            "StreamId should reject close bracket glob metacharacter"
516        );
517    }
518
519    #[tokio::test]
520    async fn event_reader_after_position_excludes_event_at_position() {
521        // Given: An event store with 3 events
522        let store = InMemoryEventStore::new();
523        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
524
525        let event1 = TestEvent {
526            stream_id: stream_id.clone(),
527            data: "first".to_string(),
528        };
529        let event2 = TestEvent {
530            stream_id: stream_id.clone(),
531            data: "second".to_string(),
532        };
533        let event3 = TestEvent {
534            stream_id: stream_id.clone(),
535            data: "third".to_string(),
536        };
537
538        let writes = StreamWrites::new()
539            .register_stream(stream_id.clone(), StreamVersion::new(0))
540            .and_then(|w| w.append(event1))
541            .and_then(|w| w.append(event2))
542            .and_then(|w| w.append(event3))
543            .expect("append should succeed");
544
545        store
546            .append_events(writes)
547            .await
548            .expect("append to succeed");
549
550        // First, read all events to get their positions
551        let all_events = store
552            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
553            .await
554            .expect("read all events to succeed");
555
556        assert_eq!(all_events.len(), 3, "Should have 3 events total");
557        let (first_event, first_position) = &all_events[0];
558
559        // When: We read events after the first event's position
560        let page = EventPage::after(*first_position, BatchSize::new(100));
561        let filter = EventFilter::all();
562        let events = store
563            .read_events::<TestEvent>(filter, page)
564            .await
565            .expect("read to succeed");
566
567        // Then: We should get 2 events (event2 and event3), not including event1
568        assert_eq!(events.len(), 2, "Should get 2 events after first position");
569        assert_eq!(
570            events[0].0.data, "second",
571            "First returned event should be 'second'"
572        );
573        assert_eq!(
574            events[1].0.data, "third",
575            "Second returned event should be 'third'"
576        );
577
578        // And: The first event should NOT be in the results
579        for (event, _pos) in &events {
580            assert_ne!(
581                event.data, first_event.data,
582                "First event should be excluded"
583            );
584        }
585
586        // And: All returned positions should be greater than first_position
587        for (_event, pos) in &events {
588            assert!(
589                *pos > *first_position,
590                "Returned position {} should be > first position {}",
591                pos,
592                first_position
593            );
594        }
595    }
596}