eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12    StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16/// In-memory event store implementation for testing.
17///
18/// InMemoryEventStore provides a lightweight, zero-dependency storage backend
19/// for EventCore integration tests and development. It implements the EventStore
20/// trait using standard library collections (HashMap, BTreeMap) with optimistic
21/// concurrency control via version checking.
22///
23/// # Example
24///
25/// ```ignore
26/// use eventcore_memory::InMemoryEventStore;
27///
28/// let store = InMemoryEventStore::new();
29/// // Use store with execute() function
30/// ```
31///
32/// # Thread Safety
33///
34/// InMemoryEventStore uses interior mutability for concurrent access.
35type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
36
37/// Entry in the global event log with indexed stream_id for efficient filtering.
38///
39/// This structure mirrors the Postgres schema where stream_id is a separate
40/// indexed column and event_id (UUID7) serves as the global position.
41/// By storing stream_id and event_id separately, we can filter by stream
42/// prefix and position without parsing JSON, matching the performance
43/// characteristics of the database implementation.
44#[derive(Debug, Clone)]
45struct GlobalLogEntry {
46    /// Event identifier (UUID7), used as global position
47    event_id: Uuid,
48    /// Stream identifier, extracted at write time for efficient filtering
49    stream_id: String,
50    /// Event data as JSON value
51    event_data: serde_json::Value,
52}
53
54/// Internal storage combining per-stream data with global event ordering.
55struct StoreData {
56    streams: HashMap<StreamId, StreamData>,
57    /// Global log with indexed stream_id for efficient EventReader queries
58    global_log: Vec<GlobalLogEntry>,
59    /// Checkpoint storage for projection progress tracking
60    checkpoints: HashMap<String, StreamPosition>,
61    /// Coordination locks for projector leadership
62    locks: Arc<RwLock<HashMap<String, ()>>>,
63}
64
65pub struct InMemoryEventStore {
66    data: std::sync::Mutex<StoreData>,
67}
68
69impl InMemoryEventStore {
70    /// Create a new in-memory event store.
71    ///
72    /// Returns an empty event store ready for command execution.
73    /// All streams start at version 0 (no events).
74    pub fn new() -> Self {
75        Self {
76            data: std::sync::Mutex::new(StoreData {
77                streams: HashMap::new(),
78                global_log: Vec::new(),
79                checkpoints: HashMap::new(),
80                locks: Arc::new(RwLock::new(HashMap::new())),
81            }),
82        }
83    }
84}
85
86impl Default for InMemoryEventStore {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl EventStore for InMemoryEventStore {
93    async fn read_stream<E: Event>(
94        &self,
95        stream_id: StreamId,
96    ) -> Result<EventStreamReader<E>, EventStoreError> {
97        let data = self
98            .data
99            .lock()
100            .map_err(|_| EventStoreError::StoreFailure {
101                operation: Operation::ReadStream,
102            })?;
103        let events = data
104            .streams
105            .get(&stream_id)
106            .map(|(boxed_events, _version)| {
107                boxed_events
108                    .iter()
109                    .filter_map(|boxed| boxed.downcast_ref::<E>())
110                    .cloned()
111                    .collect()
112            })
113            .unwrap_or_default();
114
115        Ok(EventStreamReader::new(events))
116    }
117
118    async fn append_events(
119        &self,
120        writes: StreamWrites,
121    ) -> Result<EventStreamSlice, EventStoreError> {
122        let mut data = self
123            .data
124            .lock()
125            .map_err(|_| EventStoreError::StoreFailure {
126                operation: Operation::AppendEvents,
127            })?;
128        let expected_versions = writes.expected_versions().clone();
129
130        // Check all version constraints before writing any events
131        for (stream_id, expected_version) in &expected_versions {
132            let current_version = data
133                .streams
134                .get(stream_id)
135                .map(|(_events, version)| *version)
136                .unwrap_or_else(|| StreamVersion::new(0));
137
138            if current_version != *expected_version {
139                return Err(EventStoreError::VersionConflict);
140            }
141        }
142
143        // All versions match - proceed with writes
144        for entry in writes.into_entries() {
145            let StreamWriteEntry {
146                stream_id,
147                event,
148                event_type: _,
149                event_data,
150            } = entry;
151
152            // Generate UUID7 for this event (monotonic, timestamp-ordered)
153            let event_id = Uuid::now_v7();
154
155            // Store in global log for EventReader with indexed stream_id and event_id
156            data.global_log.push(GlobalLogEntry {
157                event_id,
158                stream_id: stream_id.as_ref().to_string(),
159                event_data,
160            });
161
162            let (events, version) = data
163                .streams
164                .entry(stream_id)
165                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
166            events.push(event);
167            *version = version.increment();
168        }
169
170        Ok(EventStreamSlice)
171    }
172}
173
174impl EventReader for InMemoryEventStore {
175    type Error = EventStoreError;
176
177    async fn read_events<E: Event>(
178        &self,
179        filter: EventFilter,
180        page: EventPage,
181    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
182        let data = self
183            .data
184            .lock()
185            .map_err(|_| EventStoreError::StoreFailure {
186                operation: Operation::ReadStream,
187            })?;
188
189        let after_event_id = page.after_position().map(|p| p.into_inner());
190
191        let events: Vec<(E, StreamPosition)> = data
192            .global_log
193            .iter()
194            .filter(|entry| {
195                // Filter by event_id (UUID7 comparison)
196                match after_event_id {
197                    None => true,
198                    Some(after_id) => entry.event_id > after_id,
199                }
200            })
201            .filter(|entry| {
202                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
203                match filter.stream_prefix() {
204                    None => true,
205                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
206                }
207            })
208            .take(page.limit().into_inner())
209            .filter_map(|entry| {
210                serde_json::from_value::<E>(entry.event_data.clone())
211                    .ok()
212                    .map(|e| (e, StreamPosition::new(entry.event_id)))
213            })
214            .collect();
215
216        Ok(events)
217    }
218}
219
220impl CheckpointStore for InMemoryEventStore {
221    type Error = InMemoryCheckpointError;
222
223    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
224        let data = self.data.lock().map_err(|e| InMemoryCheckpointError {
225            message: format!("failed to acquire lock: {}", e),
226        })?;
227        Ok(data.checkpoints.get(name).copied())
228    }
229
230    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
231        let mut data = self.data.lock().map_err(|e| InMemoryCheckpointError {
232            message: format!("failed to acquire lock: {}", e),
233        })?;
234        let _ = data.checkpoints.insert(name.to_string(), position);
235        Ok(())
236    }
237}
238
239impl ProjectorCoordinator for InMemoryEventStore {
240    type Error = InMemoryCoordinationError;
241    type Guard = InMemoryCoordinationGuard;
242
243    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
244        let data = self
245            .data
246            .lock()
247            .map_err(|e| InMemoryCoordinationError::LockPoisoned {
248                message: e.to_string(),
249            })?;
250
251        let mut guard =
252            data.locks
253                .write()
254                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
255                    message: e.to_string(),
256                })?;
257
258        if guard.contains_key(subscription_name) {
259            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
260                subscription_name: subscription_name.to_string(),
261            });
262        }
263
264        let _ = guard.insert(subscription_name.to_string(), ());
265
266        Ok(InMemoryCoordinationGuard {
267            subscription_name: subscription_name.to_string(),
268            locks: Arc::clone(&data.locks),
269        })
270    }
271}
272
273/// In-memory checkpoint store for tracking projection progress.
274///
275/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
276/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
277/// and single-process deployments where persistence across restarts is not required.
278///
279/// For production deployments requiring durability, use a persistent
280/// checkpoint store implementation.
281///
282/// # Example
283///
284/// ```ignore
285/// use eventcore_memory::InMemoryCheckpointStore;
286///
287/// let checkpoint_store = InMemoryCheckpointStore::new();
288/// // Use with ProjectionRunner
289/// ```
290#[derive(Debug, Clone, Default)]
291pub struct InMemoryCheckpointStore {
292    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
293}
294
295impl InMemoryCheckpointStore {
296    /// Create a new in-memory checkpoint store.
297    pub fn new() -> Self {
298        Self::default()
299    }
300}
301
302/// Error type for in-memory checkpoint store operations.
303///
304/// Since the in-memory store uses an `RwLock`, the only possible error
305/// is a poisoned lock from a panic in another thread.
306#[derive(Debug, Clone)]
307pub struct InMemoryCheckpointError {
308    message: String,
309}
310
311impl std::fmt::Display for InMemoryCheckpointError {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        write!(f, "{}", self.message)
314    }
315}
316
317impl std::error::Error for InMemoryCheckpointError {}
318
319/// Error type for in-memory coordinator operations.
320#[derive(Debug, Clone)]
321pub enum InMemoryCoordinationError {
322    /// Leadership is already held by another instance.
323    LeadershipNotAcquired { subscription_name: String },
324    /// Lock was poisoned by a panic in another thread.
325    LockPoisoned { message: String },
326}
327
328impl std::fmt::Display for InMemoryCoordinationError {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        match self {
331            Self::LeadershipNotAcquired { subscription_name } => {
332                write!(
333                    f,
334                    "leadership not acquired for subscription: {}",
335                    subscription_name
336                )
337            }
338            Self::LockPoisoned { message } => write!(f, "lock poisoned: {}", message),
339        }
340    }
341}
342
343impl std::error::Error for InMemoryCoordinationError {}
344
345/// Guard that releases leadership when dropped.
346#[derive(Debug)]
347pub struct InMemoryCoordinationGuard {
348    subscription_name: String,
349    locks: Arc<RwLock<HashMap<String, ()>>>,
350}
351
352impl Drop for InMemoryCoordinationGuard {
353    fn drop(&mut self) {
354        if let Ok(mut guard) = self.locks.write() {
355            let _ = guard.remove(&self.subscription_name);
356        }
357    }
358}
359
360/// In-memory projector coordinator for single-process deployments.
361///
362/// `InMemoryProjectorCoordinator` provides coordination for projectors within a single
363/// process using an in-memory lock table. This is suitable for testing and single-process
364/// deployments where distributed coordination is not required.
365///
366/// For distributed deployments with multiple process instances, use a database-backed
367/// coordinator implementation (e.g., PostgreSQL advisory locks).
368#[derive(Debug, Clone, Default)]
369pub struct InMemoryProjectorCoordinator {
370    locks: Arc<RwLock<HashMap<String, ()>>>,
371}
372
373impl InMemoryProjectorCoordinator {
374    /// Create a new in-memory projector coordinator.
375    pub fn new() -> Self {
376        Self::default()
377    }
378}
379
380impl ProjectorCoordinator for InMemoryProjectorCoordinator {
381    type Error = InMemoryCoordinationError;
382    type Guard = InMemoryCoordinationGuard;
383
384    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
385        let mut guard =
386            self.locks
387                .write()
388                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
389                    message: e.to_string(),
390                })?;
391
392        if guard.contains_key(subscription_name) {
393            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
394                subscription_name: subscription_name.to_string(),
395            });
396        }
397
398        let _ = guard.insert(subscription_name.to_string(), ());
399
400        Ok(InMemoryCoordinationGuard {
401            subscription_name: subscription_name.to_string(),
402            locks: Arc::clone(&self.locks),
403        })
404    }
405}
406
407impl CheckpointStore for InMemoryCheckpointStore {
408    type Error = InMemoryCheckpointError;
409
410    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
411        let guard = self
412            .checkpoints
413            .read()
414            .map_err(|e| InMemoryCheckpointError {
415                message: format!("failed to acquire read lock: {}", e),
416            })?;
417        Ok(guard.get(name).copied())
418    }
419
420    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
421        let mut guard = self
422            .checkpoints
423            .write()
424            .map_err(|e| InMemoryCheckpointError {
425                message: format!("failed to acquire write lock: {}", e),
426            })?;
427        let _ = guard.insert(name.to_string(), position);
428        Ok(())
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use eventcore_types::{BatchSize, EventFilter, EventPage};
436    use serde::{Deserialize, Serialize};
437
438    /// Test-specific domain event type for unit testing storage operations.
439    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
440    struct TestEvent {
441        stream_id: StreamId,
442        data: String,
443    }
444
445    impl Event for TestEvent {
446        fn stream_id(&self) -> &StreamId {
447            &self.stream_id
448        }
449    }
450
451    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
452    ///
453    /// This test verifies the fundamental event storage capability:
454    /// - Append an event to a stream
455    /// - Read the stream back
456    /// - Verify the event is retrievable with correct data
457    ///
458    /// This is a unit test drilling down from the failing integration test
459    /// test_deposit_command_event_data_is_retrievable. We're testing the
460    /// storage layer in isolation before testing the full command execution flow.
461    #[tokio::test]
462    async fn test_append_and_read_single_event() {
463        // Given: An in-memory event store
464        let store = InMemoryEventStore::new();
465
466        // And: A stream ID
467        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
468
469        // And: A domain event to store
470        let event = TestEvent {
471            stream_id: stream_id.clone(),
472            data: "test event data".to_string(),
473        };
474
475        // And: A collection of writes containing the event (expected version 0 for empty stream)
476        let writes = StreamWrites::new()
477            .register_stream(stream_id.clone(), StreamVersion::new(0))
478            .and_then(|writes| writes.append(event.clone()))
479            .expect("append should succeed");
480
481        // When: We append the event to the store
482        let _ = store
483            .append_events(writes)
484            .await
485            .expect("append to succeed");
486
487        let reader = store
488            .read_stream::<TestEvent>(stream_id)
489            .await
490            .expect("read to succeed");
491
492        let observed = (
493            reader.is_empty(),
494            reader.len(),
495            reader.iter().next().is_none(),
496        );
497
498        assert_eq!(observed, (false, 1usize, false));
499    }
500
501    #[tokio::test]
502    async fn event_stream_reader_is_empty_reflects_stream_population() {
503        let store = InMemoryEventStore::new();
504        let stream_id =
505            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
506
507        let initial_reader = store
508            .read_stream::<TestEvent>(stream_id.clone())
509            .await
510            .expect("initial read to succeed");
511
512        let event = TestEvent {
513            stream_id: stream_id.clone(),
514            data: "populated event".to_string(),
515        };
516
517        let writes = StreamWrites::new()
518            .register_stream(stream_id.clone(), StreamVersion::new(0))
519            .and_then(|writes| writes.append(event))
520            .expect("append should succeed");
521
522        let _ = store
523            .append_events(writes)
524            .await
525            .expect("append to succeed");
526
527        let populated_reader = store
528            .read_stream::<TestEvent>(stream_id)
529            .await
530            .expect("populated read to succeed");
531
532        let observed = (
533            initial_reader.is_empty(),
534            initial_reader.len(),
535            populated_reader.is_empty(),
536            populated_reader.len(),
537        );
538
539        assert_eq!(observed, (true, 0usize, false, 1usize));
540    }
541
542    #[tokio::test]
543    async fn read_stream_iterates_through_events_in_order() {
544        let store = InMemoryEventStore::new();
545        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
546
547        let first_event = TestEvent {
548            stream_id: stream_id.clone(),
549            data: "first".to_string(),
550        };
551
552        let second_event = TestEvent {
553            stream_id: stream_id.clone(),
554            data: "second".to_string(),
555        };
556
557        let writes = StreamWrites::new()
558            .register_stream(stream_id.clone(), StreamVersion::new(0))
559            .and_then(|writes| writes.append(first_event))
560            .and_then(|writes| writes.append(second_event))
561            .expect("append chain should succeed");
562
563        let _ = store
564            .append_events(writes)
565            .await
566            .expect("append to succeed");
567
568        let reader = store
569            .read_stream::<TestEvent>(stream_id)
570            .await
571            .expect("read to succeed");
572
573        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
574
575        let observed = (reader.is_empty(), collected);
576
577        assert_eq!(
578            observed,
579            (false, vec!["first".to_string(), "second".to_string()])
580        );
581    }
582
583    #[test]
584    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
585        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
586            .expect("valid stream id");
587
588        let first_event = TestEvent {
589            stream_id: stream_id.clone(),
590            data: "first-event".to_string(),
591        };
592
593        let second_event = TestEvent {
594            stream_id: stream_id.clone(),
595            data: "second-event".to_string(),
596        };
597
598        let writes_result = StreamWrites::new()
599            .register_stream(stream_id.clone(), StreamVersion::new(0))
600            .and_then(|writes| writes.append(first_event))
601            .and_then(|writes| writes.append(second_event));
602
603        assert!(writes_result.is_ok());
604    }
605
606    #[test]
607    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
608        let stream_id =
609            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
610
611        let first_event = TestEvent {
612            stream_id: stream_id.clone(),
613            data: "first-event-conflict".to_string(),
614        };
615
616        let second_event = TestEvent {
617            stream_id: stream_id.clone(),
618            data: "second-event-conflict".to_string(),
619        };
620
621        let conflict = StreamWrites::new()
622            .register_stream(stream_id.clone(), StreamVersion::new(0))
623            .and_then(|writes| writes.append(first_event))
624            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
625            .and_then(|writes| writes.append(second_event));
626
627        let message = conflict.unwrap_err().to_string();
628
629        assert_eq!(
630            message,
631            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
632        );
633    }
634
635    #[tokio::test]
636    async fn stream_writes_registers_stream_before_appending_multiple_events() {
637        let store = InMemoryEventStore::new();
638        let stream_id =
639            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
640
641        let first_event = TestEvent {
642            stream_id: stream_id.clone(),
643            data: "first-registered-event".to_string(),
644        };
645
646        let second_event = TestEvent {
647            stream_id: stream_id.clone(),
648            data: "second-registered-event".to_string(),
649        };
650
651        let writes = StreamWrites::new()
652            .register_stream(stream_id.clone(), StreamVersion::new(0))
653            .and_then(|writes| writes.append(first_event))
654            .and_then(|writes| writes.append(second_event))
655            .expect("registered stream should accept events");
656
657        let result = store.append_events(writes).await;
658
659        assert!(
660            result.is_ok(),
661            "append should succeed when stream registered before events"
662        );
663    }
664
665    #[test]
666    fn stream_writes_rejects_appends_for_unregistered_streams() {
667        let stream_id =
668            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
669
670        let event = TestEvent {
671            stream_id: stream_id.clone(),
672            data: "unregistered-event".to_string(),
673        };
674
675        let error = StreamWrites::new()
676            .append(event)
677            .expect_err("append without prior registration should fail");
678
679        assert!(matches!(
680            error,
681            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
682        ));
683    }
684
685    #[test]
686    fn expected_versions_returns_registered_streams_and_versions() {
687        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
688        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
689
690        let writes = StreamWrites::new()
691            .register_stream(stream_a.clone(), StreamVersion::new(0))
692            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
693            .expect("registration should succeed");
694
695        let versions = writes.expected_versions();
696
697        assert_eq!(versions.len(), 2);
698        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
699        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
700    }
701
702    #[test]
703    fn stream_id_rejects_asterisk_metacharacter() {
704        let result = StreamId::try_new("account-*");
705        assert!(
706            result.is_err(),
707            "StreamId should reject asterisk glob metacharacter"
708        );
709    }
710
711    #[test]
712    fn stream_id_rejects_question_mark_metacharacter() {
713        let result = StreamId::try_new("account-?");
714        assert!(
715            result.is_err(),
716            "StreamId should reject question mark glob metacharacter"
717        );
718    }
719
720    #[test]
721    fn stream_id_rejects_open_bracket_metacharacter() {
722        let result = StreamId::try_new("account-[");
723        assert!(
724            result.is_err(),
725            "StreamId should reject open bracket glob metacharacter"
726        );
727    }
728
729    #[test]
730    fn stream_id_rejects_close_bracket_metacharacter() {
731        let result = StreamId::try_new("account-]");
732        assert!(
733            result.is_err(),
734            "StreamId should reject close bracket glob metacharacter"
735        );
736    }
737
738    #[tokio::test]
739    async fn event_reader_after_position_excludes_event_at_position() {
740        // Given: An event store with 3 events
741        let store = InMemoryEventStore::new();
742        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
743
744        let event1 = TestEvent {
745            stream_id: stream_id.clone(),
746            data: "first".to_string(),
747        };
748        let event2 = TestEvent {
749            stream_id: stream_id.clone(),
750            data: "second".to_string(),
751        };
752        let event3 = TestEvent {
753            stream_id: stream_id.clone(),
754            data: "third".to_string(),
755        };
756
757        let writes = StreamWrites::new()
758            .register_stream(stream_id.clone(), StreamVersion::new(0))
759            .and_then(|w| w.append(event1))
760            .and_then(|w| w.append(event2))
761            .and_then(|w| w.append(event3))
762            .expect("append should succeed");
763
764        store
765            .append_events(writes)
766            .await
767            .expect("append to succeed");
768
769        // First, read all events to get their positions
770        let all_events = store
771            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
772            .await
773            .expect("read all events to succeed");
774
775        assert_eq!(all_events.len(), 3, "Should have 3 events total");
776        let (first_event, first_position) = &all_events[0];
777
778        // When: We read events after the first event's position
779        let page = EventPage::after(*first_position, BatchSize::new(100));
780        let filter = EventFilter::all();
781        let events = store
782            .read_events::<TestEvent>(filter, page)
783            .await
784            .expect("read to succeed");
785
786        // Then: We should get 2 events (event2 and event3), not including event1
787        assert_eq!(events.len(), 2, "Should get 2 events after first position");
788        assert_eq!(
789            events[0].0.data, "second",
790            "First returned event should be 'second'"
791        );
792        assert_eq!(
793            events[1].0.data, "third",
794            "Second returned event should be 'third'"
795        );
796
797        // And: The first event should NOT be in the results
798        for (event, _pos) in &events {
799            assert_ne!(
800                event.data, first_event.data,
801                "First event should be excluded"
802            );
803        }
804
805        // And: All returned positions should be greater than first_position
806        for (_event, pos) in &events {
807            assert!(
808                *pos > *first_position,
809                "Returned position {} should be > first position {}",
810                pos,
811                first_position
812            );
813        }
814    }
815
816    #[tokio::test]
817    async fn in_memory_event_store_implements_checkpoint_store() {
818        // Given: An InMemoryEventStore
819        let store = InMemoryEventStore::new();
820
821        // When: We save a checkpoint
822        let position = StreamPosition::new(Uuid::now_v7());
823        CheckpointStore::save(&store, "test-projector", position)
824            .await
825            .expect("save should succeed");
826
827        // Then: We can load it back
828        let loaded = CheckpointStore::load(&store, "test-projector")
829            .await
830            .expect("load should succeed");
831        assert_eq!(loaded, Some(position));
832    }
833
834    #[tokio::test]
835    async fn in_memory_event_store_implements_projector_coordinator() {
836        // Given: An InMemoryEventStore
837        let store = InMemoryEventStore::new();
838
839        // When: We try to acquire leadership
840        let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
841
842        // Then: It should succeed
843        assert!(guard.is_ok(), "should acquire leadership");
844    }
845}