Skip to main content

eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12    StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18/// Entry in the global event log with indexed stream_id for efficient filtering.
19///
20/// This structure mirrors the Postgres schema where stream_id is a separate
21/// indexed column and event_id (UUID7) serves as the global position.
22/// By storing stream_id and event_id separately, we can filter by stream
23/// prefix and position without parsing JSON, matching the performance
24/// characteristics of the database implementation.
25#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27    /// Event identifier (UUID7), used as global position
28    event_id: Uuid,
29    /// Stream identifier, extracted at write time for efficient filtering
30    stream_id: String,
31    /// Event data as JSON value
32    event_data: serde_json::Value,
33}
34
35/// Internal storage combining per-stream data with global event ordering.
36struct StoreData {
37    streams: HashMap<StreamId, StreamData>,
38    /// Global log with indexed stream_id for efficient EventReader queries
39    global_log: Vec<GlobalLogEntry>,
40    /// Checkpoint storage for projection progress tracking
41    checkpoints: HashMap<String, StreamPosition>,
42    /// Coordination locks for projector leadership
43    locks: Arc<RwLock<HashMap<String, ()>>>,
44}
45
46/// In-memory event store implementation for testing.
47///
48/// `InMemoryEventStore` provides a lightweight, zero-dependency storage backend
49/// for EventCore integration tests and development. It implements the `EventStore`,
50/// `EventReader`, `CheckpointStore`, and `ProjectorCoordinator` traits using
51/// standard library collections with optimistic concurrency control via version
52/// checking.
53///
54/// # Example
55///
56/// ```ignore
57/// use eventcore_memory::InMemoryEventStore;
58///
59/// let store = InMemoryEventStore::new();
60/// // Use store with execute() function
61/// ```
62///
63/// # Thread Safety
64///
65/// `InMemoryEventStore` uses interior mutability (`Mutex`) for concurrent access.
66pub struct InMemoryEventStore {
67    data: std::sync::Mutex<StoreData>,
68}
69
70impl InMemoryEventStore {
71    /// Create a new in-memory event store.
72    ///
73    /// Returns an empty event store ready for command execution.
74    /// All streams start at version 0 (no events).
75    pub fn new() -> Self {
76        Self {
77            data: std::sync::Mutex::new(StoreData {
78                streams: HashMap::new(),
79                global_log: Vec::new(),
80                checkpoints: HashMap::new(),
81                locks: Arc::new(RwLock::new(HashMap::new())),
82            }),
83        }
84    }
85}
86
87impl Default for InMemoryEventStore {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl EventStore for InMemoryEventStore {
94    async fn read_stream<E: Event>(
95        &self,
96        stream_id: StreamId,
97    ) -> Result<EventStreamReader<E>, EventStoreError> {
98        let data = self
99            .data
100            .lock()
101            .map_err(|_| EventStoreError::StoreFailure {
102                operation: Operation::ReadStream,
103            })?;
104        let events = match data.streams.get(&stream_id) {
105            None => Vec::new(),
106            Some((boxed_events, _version)) => {
107                let mut events = Vec::with_capacity(boxed_events.len());
108                for boxed in boxed_events {
109                    match boxed.downcast_ref::<E>() {
110                        Some(event) => events.push(event.clone()),
111                        None => {
112                            return Err(EventStoreError::DeserializationFailed {
113                                stream_id,
114                                detail: format!(
115                                    "event could not be downcast to {}",
116                                    std::any::type_name::<E>()
117                                ),
118                            });
119                        }
120                    }
121                }
122                events
123            }
124        };
125
126        Ok(EventStreamReader::new(events))
127    }
128
129    async fn append_events(
130        &self,
131        writes: StreamWrites,
132    ) -> Result<EventStreamSlice, EventStoreError> {
133        let mut data = self
134            .data
135            .lock()
136            .map_err(|_| EventStoreError::StoreFailure {
137                operation: Operation::AppendEvents,
138            })?;
139        let expected_versions = writes.expected_versions().clone();
140
141        // Check all version constraints before writing any events
142        for (stream_id, expected_version) in &expected_versions {
143            let current_version = data
144                .streams
145                .get(stream_id)
146                .map(|(_events, version)| *version)
147                .unwrap_or_else(|| StreamVersion::new(0));
148
149            if current_version != *expected_version {
150                return Err(EventStoreError::VersionConflict {
151                    stream_id: stream_id.clone(),
152                    expected: *expected_version,
153                    actual: current_version,
154                });
155            }
156        }
157
158        // All versions match - proceed with writes
159        for entry in writes.into_entries() {
160            let StreamWriteEntry {
161                stream_id,
162                event,
163                event_type: _,
164                event_data,
165            } = entry;
166
167            // Generate UUID7 for this event (monotonic, timestamp-ordered)
168            let event_id = Uuid::now_v7();
169
170            // Store in global log for EventReader with indexed stream_id and event_id
171            data.global_log.push(GlobalLogEntry {
172                event_id,
173                stream_id: stream_id.as_ref().to_string(),
174                event_data,
175            });
176
177            let (events, version) = data
178                .streams
179                .entry(stream_id)
180                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
181            events.push(event);
182            *version = version.increment();
183        }
184
185        Ok(EventStreamSlice)
186    }
187}
188
189impl EventReader for InMemoryEventStore {
190    type Error = EventStoreError;
191
192    async fn read_events<E: Event>(
193        &self,
194        filter: EventFilter,
195        page: EventPage,
196    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
197        let data = self
198            .data
199            .lock()
200            .map_err(|_| EventStoreError::StoreFailure {
201                operation: Operation::ReadStream,
202            })?;
203
204        let after_event_id = page.after_position().map(|p| p.into_inner());
205
206        let events: Vec<(E, StreamPosition)> = data
207            .global_log
208            .iter()
209            .filter(|entry| {
210                // Filter by event_id (UUID7 comparison)
211                match after_event_id {
212                    None => true,
213                    Some(after_id) => entry.event_id > after_id,
214                }
215            })
216            .filter(|entry| {
217                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
218                match filter.stream_prefix() {
219                    None => true,
220                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
221                }
222            })
223            .take(page.limit().into_inner())
224            .filter_map(|entry| {
225                serde_json::from_value::<E>(entry.event_data.clone())
226                    .ok()
227                    .map(|e| (e, StreamPosition::new(entry.event_id)))
228            })
229            .collect();
230
231        Ok(events)
232    }
233}
234
235impl CheckpointStore for InMemoryEventStore {
236    type Error = InMemoryCheckpointError;
237
238    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
239        let data = self
240            .data
241            .lock()
242            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
243        Ok(data.checkpoints.get(name).copied())
244    }
245
246    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
247        let mut data = self
248            .data
249            .lock()
250            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
251        let _ = data.checkpoints.insert(name.to_string(), position);
252        Ok(())
253    }
254}
255
256impl ProjectorCoordinator for InMemoryEventStore {
257    type Error = InMemoryCoordinationError;
258    type Guard = InMemoryCoordinationGuard;
259
260    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
261        let data = self
262            .data
263            .lock()
264            .map_err(|e| InMemoryCoordinationError::LockPoisoned {
265                message: e.to_string(),
266            })?;
267
268        let mut guard =
269            data.locks
270                .write()
271                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
272                    message: e.to_string(),
273                })?;
274
275        if guard.contains_key(subscription_name) {
276            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
277                subscription_name: subscription_name.to_string(),
278            });
279        }
280
281        let _ = guard.insert(subscription_name.to_string(), ());
282
283        Ok(InMemoryCoordinationGuard {
284            subscription_name: subscription_name.to_string(),
285            locks: Arc::clone(&data.locks),
286        })
287    }
288}
289
290/// In-memory checkpoint store for tracking projection progress.
291///
292/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
293/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
294/// and single-process deployments where persistence across restarts is not required.
295///
296/// For production deployments requiring durability, use a persistent
297/// checkpoint store implementation.
298///
299/// # Example
300///
301/// ```ignore
302/// use eventcore_memory::InMemoryCheckpointStore;
303///
304/// let checkpoint_store = InMemoryCheckpointStore::new();
305/// // Use with ProjectionRunner
306/// ```
307#[derive(Debug, Clone, Default)]
308pub struct InMemoryCheckpointStore {
309    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
310}
311
312impl InMemoryCheckpointStore {
313    /// Create a new in-memory checkpoint store.
314    pub fn new() -> Self {
315        Self::default()
316    }
317}
318
319/// Error type for in-memory checkpoint store operations.
320///
321/// Since the in-memory store uses an `RwLock`, the only possible error
322/// is a poisoned lock from a panic in another thread.
323#[derive(Debug, Clone, thiserror::Error)]
324pub enum InMemoryCheckpointError {
325    #[error("failed to acquire lock: {0}")]
326    LockFailed(String),
327}
328
329/// Error type for in-memory coordinator operations.
330#[derive(Debug, Clone, thiserror::Error)]
331pub enum InMemoryCoordinationError {
332    /// Leadership is already held by another instance.
333    #[error(
334        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
335    )]
336    LeadershipNotAcquired { subscription_name: String },
337    /// Lock was poisoned by a panic in another thread.
338    #[error("lock poisoned: {message}")]
339    LockPoisoned { message: String },
340}
341
342/// Guard that releases leadership when dropped.
343#[derive(Debug)]
344pub struct InMemoryCoordinationGuard {
345    subscription_name: String,
346    locks: Arc<RwLock<HashMap<String, ()>>>,
347}
348
349impl Drop for InMemoryCoordinationGuard {
350    fn drop(&mut self) {
351        if let Ok(mut guard) = self.locks.write() {
352            let _ = guard.remove(&self.subscription_name);
353        } else {
354            tracing::error!(
355                subscription_name = %self.subscription_name,
356                "failed to release coordination lock: RwLock poisoned"
357            );
358        }
359    }
360}
361
362/// In-memory projector coordinator for single-process deployments.
363///
364/// `InMemoryProjectorCoordinator` provides coordination for projectors within a single
365/// process using an in-memory lock table. This is suitable for testing and single-process
366/// deployments where distributed coordination is not required.
367///
368/// For distributed deployments with multiple process instances, use a database-backed
369/// coordinator implementation (e.g., PostgreSQL advisory locks).
370#[derive(Debug, Clone, Default)]
371pub struct InMemoryProjectorCoordinator {
372    locks: Arc<RwLock<HashMap<String, ()>>>,
373}
374
375impl InMemoryProjectorCoordinator {
376    /// Create a new in-memory projector coordinator.
377    pub fn new() -> Self {
378        Self::default()
379    }
380}
381
382impl ProjectorCoordinator for InMemoryProjectorCoordinator {
383    type Error = InMemoryCoordinationError;
384    type Guard = InMemoryCoordinationGuard;
385
386    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
387        let mut guard =
388            self.locks
389                .write()
390                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
391                    message: e.to_string(),
392                })?;
393
394        if guard.contains_key(subscription_name) {
395            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
396                subscription_name: subscription_name.to_string(),
397            });
398        }
399
400        let _ = guard.insert(subscription_name.to_string(), ());
401
402        Ok(InMemoryCoordinationGuard {
403            subscription_name: subscription_name.to_string(),
404            locks: Arc::clone(&self.locks),
405        })
406    }
407}
408
409impl CheckpointStore for InMemoryCheckpointStore {
410    type Error = InMemoryCheckpointError;
411
412    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
413        let guard = self
414            .checkpoints
415            .read()
416            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
417        Ok(guard.get(name).copied())
418    }
419
420    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
421        let mut guard = self
422            .checkpoints
423            .write()
424            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
425        let _ = guard.insert(name.to_string(), position);
426        Ok(())
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use eventcore_types::{BatchSize, EventFilter, EventPage};
434    use serde::{Deserialize, Serialize};
435
436    /// Test-specific domain event type for unit testing storage operations.
437    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
438    struct TestEvent {
439        stream_id: StreamId,
440        data: String,
441    }
442
443    impl Event for TestEvent {
444        fn stream_id(&self) -> &StreamId {
445            &self.stream_id
446        }
447
448        fn event_type_name() -> &'static str {
449            "TestEvent"
450        }
451    }
452
453    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
454    ///
455    /// This test verifies the fundamental event storage capability:
456    /// - Append an event to a stream
457    /// - Read the stream back
458    /// - Verify the event is retrievable with correct data
459    ///
460    /// This is a unit test drilling down from the failing integration test
461    /// test_deposit_command_event_data_is_retrievable. We're testing the
462    /// storage layer in isolation before testing the full command execution flow.
463    #[tokio::test]
464    async fn test_append_and_read_single_event() {
465        // Given: An in-memory event store
466        let store = InMemoryEventStore::new();
467
468        // And: A stream ID
469        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
470
471        // And: A domain event to store
472        let event = TestEvent {
473            stream_id: stream_id.clone(),
474            data: "test event data".to_string(),
475        };
476
477        // And: A collection of writes containing the event (expected version 0 for empty stream)
478        let writes = StreamWrites::new()
479            .register_stream(stream_id.clone(), StreamVersion::new(0))
480            .and_then(|writes| writes.append(event.clone()))
481            .expect("append should succeed");
482
483        // When: We append the event to the store
484        let _ = store
485            .append_events(writes)
486            .await
487            .expect("append to succeed");
488
489        let reader = store
490            .read_stream::<TestEvent>(stream_id)
491            .await
492            .expect("read to succeed");
493
494        let observed = (
495            reader.is_empty(),
496            reader.len(),
497            reader.iter().next().is_none(),
498        );
499
500        assert_eq!(observed, (false, 1usize, false));
501    }
502
503    #[tokio::test]
504    async fn event_stream_reader_is_empty_reflects_stream_population() {
505        let store = InMemoryEventStore::new();
506        let stream_id =
507            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
508
509        let initial_reader = store
510            .read_stream::<TestEvent>(stream_id.clone())
511            .await
512            .expect("initial read to succeed");
513
514        let event = TestEvent {
515            stream_id: stream_id.clone(),
516            data: "populated event".to_string(),
517        };
518
519        let writes = StreamWrites::new()
520            .register_stream(stream_id.clone(), StreamVersion::new(0))
521            .and_then(|writes| writes.append(event))
522            .expect("append should succeed");
523
524        let _ = store
525            .append_events(writes)
526            .await
527            .expect("append to succeed");
528
529        let populated_reader = store
530            .read_stream::<TestEvent>(stream_id)
531            .await
532            .expect("populated read to succeed");
533
534        let observed = (
535            initial_reader.is_empty(),
536            initial_reader.len(),
537            populated_reader.is_empty(),
538            populated_reader.len(),
539        );
540
541        assert_eq!(observed, (true, 0usize, false, 1usize));
542    }
543
544    #[tokio::test]
545    async fn read_stream_iterates_through_events_in_order() {
546        let store = InMemoryEventStore::new();
547        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
548
549        let first_event = TestEvent {
550            stream_id: stream_id.clone(),
551            data: "first".to_string(),
552        };
553
554        let second_event = TestEvent {
555            stream_id: stream_id.clone(),
556            data: "second".to_string(),
557        };
558
559        let writes = StreamWrites::new()
560            .register_stream(stream_id.clone(), StreamVersion::new(0))
561            .and_then(|writes| writes.append(first_event))
562            .and_then(|writes| writes.append(second_event))
563            .expect("append chain should succeed");
564
565        let _ = store
566            .append_events(writes)
567            .await
568            .expect("append to succeed");
569
570        let reader = store
571            .read_stream::<TestEvent>(stream_id)
572            .await
573            .expect("read to succeed");
574
575        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
576
577        let observed = (reader.is_empty(), collected);
578
579        assert_eq!(
580            observed,
581            (false, vec!["first".to_string(), "second".to_string()])
582        );
583    }
584
585    #[test]
586    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
587        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
588            .expect("valid stream id");
589
590        let first_event = TestEvent {
591            stream_id: stream_id.clone(),
592            data: "first-event".to_string(),
593        };
594
595        let second_event = TestEvent {
596            stream_id: stream_id.clone(),
597            data: "second-event".to_string(),
598        };
599
600        let writes_result = StreamWrites::new()
601            .register_stream(stream_id.clone(), StreamVersion::new(0))
602            .and_then(|writes| writes.append(first_event))
603            .and_then(|writes| writes.append(second_event));
604
605        assert!(writes_result.is_ok());
606    }
607
608    #[test]
609    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
610        let stream_id =
611            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
612
613        let first_event = TestEvent {
614            stream_id: stream_id.clone(),
615            data: "first-event-conflict".to_string(),
616        };
617
618        let second_event = TestEvent {
619            stream_id: stream_id.clone(),
620            data: "second-event-conflict".to_string(),
621        };
622
623        let conflict = StreamWrites::new()
624            .register_stream(stream_id.clone(), StreamVersion::new(0))
625            .and_then(|writes| writes.append(first_event))
626            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
627            .and_then(|writes| writes.append(second_event));
628
629        let message = conflict.unwrap_err().to_string();
630
631        assert_eq!(
632            message,
633            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
634        );
635    }
636
637    #[tokio::test]
638    async fn stream_writes_registers_stream_before_appending_multiple_events() {
639        let store = InMemoryEventStore::new();
640        let stream_id =
641            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
642
643        let first_event = TestEvent {
644            stream_id: stream_id.clone(),
645            data: "first-registered-event".to_string(),
646        };
647
648        let second_event = TestEvent {
649            stream_id: stream_id.clone(),
650            data: "second-registered-event".to_string(),
651        };
652
653        let writes = StreamWrites::new()
654            .register_stream(stream_id.clone(), StreamVersion::new(0))
655            .and_then(|writes| writes.append(first_event))
656            .and_then(|writes| writes.append(second_event))
657            .expect("registered stream should accept events");
658
659        let result = store.append_events(writes).await;
660
661        assert!(
662            result.is_ok(),
663            "append should succeed when stream registered before events"
664        );
665    }
666
667    #[test]
668    fn stream_writes_rejects_appends_for_unregistered_streams() {
669        let stream_id =
670            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
671
672        let event = TestEvent {
673            stream_id: stream_id.clone(),
674            data: "unregistered-event".to_string(),
675        };
676
677        let error = StreamWrites::new()
678            .append(event)
679            .expect_err("append without prior registration should fail");
680
681        assert!(matches!(
682            error,
683            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
684        ));
685    }
686
687    #[test]
688    fn expected_versions_returns_registered_streams_and_versions() {
689        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
690        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
691
692        let writes = StreamWrites::new()
693            .register_stream(stream_a.clone(), StreamVersion::new(0))
694            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
695            .expect("registration should succeed");
696
697        let versions = writes.expected_versions();
698
699        assert_eq!(versions.len(), 2);
700        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
701        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
702    }
703
704    #[test]
705    fn stream_id_rejects_asterisk_metacharacter() {
706        let result = StreamId::try_new("account-*");
707        assert!(
708            result.is_err(),
709            "StreamId should reject asterisk glob metacharacter"
710        );
711    }
712
713    #[test]
714    fn stream_id_rejects_question_mark_metacharacter() {
715        let result = StreamId::try_new("account-?");
716        assert!(
717            result.is_err(),
718            "StreamId should reject question mark glob metacharacter"
719        );
720    }
721
722    #[test]
723    fn stream_id_rejects_open_bracket_metacharacter() {
724        let result = StreamId::try_new("account-[");
725        assert!(
726            result.is_err(),
727            "StreamId should reject open bracket glob metacharacter"
728        );
729    }
730
731    #[test]
732    fn stream_id_rejects_close_bracket_metacharacter() {
733        let result = StreamId::try_new("account-]");
734        assert!(
735            result.is_err(),
736            "StreamId should reject close bracket glob metacharacter"
737        );
738    }
739
740    #[tokio::test]
741    async fn event_reader_after_position_excludes_event_at_position() {
742        // Given: An event store with 3 events
743        let store = InMemoryEventStore::new();
744        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
745
746        let event1 = TestEvent {
747            stream_id: stream_id.clone(),
748            data: "first".to_string(),
749        };
750        let event2 = TestEvent {
751            stream_id: stream_id.clone(),
752            data: "second".to_string(),
753        };
754        let event3 = TestEvent {
755            stream_id: stream_id.clone(),
756            data: "third".to_string(),
757        };
758
759        let writes = StreamWrites::new()
760            .register_stream(stream_id.clone(), StreamVersion::new(0))
761            .and_then(|w| w.append(event1))
762            .and_then(|w| w.append(event2))
763            .and_then(|w| w.append(event3))
764            .expect("append should succeed");
765
766        let _ = store
767            .append_events(writes)
768            .await
769            .expect("append to succeed");
770
771        // First, read all events to get their positions
772        let all_events = store
773            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
774            .await
775            .expect("read all events to succeed");
776
777        assert_eq!(all_events.len(), 3, "Should have 3 events total");
778        let (first_event, first_position) = &all_events[0];
779
780        // When: We read events after the first event's position
781        let page = EventPage::after(*first_position, BatchSize::new(100));
782        let filter = EventFilter::all();
783        let events = store
784            .read_events::<TestEvent>(filter, page)
785            .await
786            .expect("read to succeed");
787
788        // Then: We should get 2 events (event2 and event3), not including event1
789        assert_eq!(events.len(), 2, "Should get 2 events after first position");
790        assert_eq!(
791            events[0].0.data, "second",
792            "First returned event should be 'second'"
793        );
794        assert_eq!(
795            events[1].0.data, "third",
796            "Second returned event should be 'third'"
797        );
798
799        // And: The first event should NOT be in the results
800        for (event, _pos) in &events {
801            assert_ne!(
802                event.data, first_event.data,
803                "First event should be excluded"
804            );
805        }
806
807        // And: All returned positions should be greater than first_position
808        for (_event, pos) in &events {
809            assert!(
810                *pos > *first_position,
811                "Returned position {} should be > first position {}",
812                pos,
813                first_position
814            );
815        }
816    }
817
818    #[tokio::test]
819    async fn in_memory_event_store_implements_checkpoint_store() {
820        // Given: An InMemoryEventStore
821        let store = InMemoryEventStore::new();
822
823        // When: We save a checkpoint
824        let position = StreamPosition::new(Uuid::now_v7());
825        CheckpointStore::save(&store, "test-projector", position)
826            .await
827            .expect("save should succeed");
828
829        // Then: We can load it back
830        let loaded = CheckpointStore::load(&store, "test-projector")
831            .await
832            .expect("load should succeed");
833        assert_eq!(loaded, Some(position));
834    }
835
836    #[tokio::test]
837    async fn in_memory_event_store_implements_projector_coordinator() {
838        // Given: An InMemoryEventStore
839        let store = InMemoryEventStore::new();
840
841        // When: We try to acquire leadership
842        let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
843
844        // Then: It should succeed
845        assert!(guard.is_ok(), "should acquire leadership");
846    }
847}