Skip to main content

eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12    StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18/// Entry in the global event log with indexed stream_id for efficient filtering.
19///
20/// This structure mirrors the Postgres schema where stream_id is a separate
21/// indexed column and event_id (UUID7) serves as the global position.
22/// By storing stream_id and event_id separately, we can filter by stream
23/// prefix and position without parsing JSON, matching the performance
24/// characteristics of the database implementation.
25#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27    /// Event identifier (UUID7), used as global position
28    event_id: Uuid,
29    /// Stream identifier, extracted at write time for efficient filtering
30    stream_id: String,
31    /// Event data as JSON value
32    event_data: serde_json::Value,
33}
34
35/// Internal storage combining per-stream data with global event ordering.
36struct StoreData {
37    streams: HashMap<StreamId, StreamData>,
38    /// Global log with indexed stream_id for efficient EventReader queries
39    global_log: Vec<GlobalLogEntry>,
40    /// Checkpoint storage for projection progress tracking
41    checkpoints: HashMap<String, StreamPosition>,
42    /// Coordination locks for projector leadership
43    locks: Arc<RwLock<HashMap<String, ()>>>,
44}
45
46/// In-memory event store implementation for testing.
47///
48/// `InMemoryEventStore` provides a lightweight, zero-dependency storage backend
49/// for EventCore integration tests and development. It implements the `EventStore`,
50/// `EventReader`, `CheckpointStore`, and `ProjectorCoordinator` traits using
51/// standard library collections with optimistic concurrency control via version
52/// checking.
53///
54/// # Example
55///
56/// ```ignore
57/// use eventcore_memory::InMemoryEventStore;
58///
59/// let store = InMemoryEventStore::new();
60/// // Use store with execute() function
61/// ```
62///
63/// # Thread Safety
64///
65/// `InMemoryEventStore` uses interior mutability (`Mutex`) for concurrent access.
66pub struct InMemoryEventStore {
67    data: std::sync::Mutex<StoreData>,
68}
69
70impl InMemoryEventStore {
71    /// Create a new in-memory event store.
72    ///
73    /// Returns an empty event store ready for command execution.
74    /// All streams start at version 0 (no events).
75    pub fn new() -> Self {
76        Self {
77            data: std::sync::Mutex::new(StoreData {
78                streams: HashMap::new(),
79                global_log: Vec::new(),
80                checkpoints: HashMap::new(),
81                locks: Arc::new(RwLock::new(HashMap::new())),
82            }),
83        }
84    }
85}
86
87impl Default for InMemoryEventStore {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl EventStore for InMemoryEventStore {
94    async fn read_stream<E: Event>(
95        &self,
96        stream_id: StreamId,
97    ) -> Result<EventStreamReader<E>, EventStoreError> {
98        let data = self
99            .data
100            .lock()
101            .map_err(|_| EventStoreError::StoreFailure {
102                operation: Operation::ReadStream,
103            })?;
104        let events = data
105            .streams
106            .get(&stream_id)
107            .map(|(boxed_events, _version)| {
108                boxed_events
109                    .iter()
110                    .filter_map(|boxed| boxed.downcast_ref::<E>())
111                    .cloned()
112                    .collect()
113            })
114            .unwrap_or_default();
115
116        Ok(EventStreamReader::new(events))
117    }
118
119    async fn append_events(
120        &self,
121        writes: StreamWrites,
122    ) -> Result<EventStreamSlice, EventStoreError> {
123        let mut data = self
124            .data
125            .lock()
126            .map_err(|_| EventStoreError::StoreFailure {
127                operation: Operation::AppendEvents,
128            })?;
129        let expected_versions = writes.expected_versions().clone();
130
131        // Check all version constraints before writing any events
132        for (stream_id, expected_version) in &expected_versions {
133            let current_version = data
134                .streams
135                .get(stream_id)
136                .map(|(_events, version)| *version)
137                .unwrap_or_else(|| StreamVersion::new(0));
138
139            if current_version != *expected_version {
140                return Err(EventStoreError::VersionConflict);
141            }
142        }
143
144        // All versions match - proceed with writes
145        for entry in writes.into_entries() {
146            let StreamWriteEntry {
147                stream_id,
148                event,
149                event_type: _,
150                event_data,
151            } = entry;
152
153            // Generate UUID7 for this event (monotonic, timestamp-ordered)
154            let event_id = Uuid::now_v7();
155
156            // Store in global log for EventReader with indexed stream_id and event_id
157            data.global_log.push(GlobalLogEntry {
158                event_id,
159                stream_id: stream_id.as_ref().to_string(),
160                event_data,
161            });
162
163            let (events, version) = data
164                .streams
165                .entry(stream_id)
166                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
167            events.push(event);
168            *version = version.increment();
169        }
170
171        Ok(EventStreamSlice)
172    }
173}
174
175impl EventReader for InMemoryEventStore {
176    type Error = EventStoreError;
177
178    async fn read_events<E: Event>(
179        &self,
180        filter: EventFilter,
181        page: EventPage,
182    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
183        let data = self
184            .data
185            .lock()
186            .map_err(|_| EventStoreError::StoreFailure {
187                operation: Operation::ReadStream,
188            })?;
189
190        let after_event_id = page.after_position().map(|p| p.into_inner());
191
192        let events: Vec<(E, StreamPosition)> = data
193            .global_log
194            .iter()
195            .filter(|entry| {
196                // Filter by event_id (UUID7 comparison)
197                match after_event_id {
198                    None => true,
199                    Some(after_id) => entry.event_id > after_id,
200                }
201            })
202            .filter(|entry| {
203                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
204                match filter.stream_prefix() {
205                    None => true,
206                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
207                }
208            })
209            .take(page.limit().into_inner())
210            .filter_map(|entry| {
211                serde_json::from_value::<E>(entry.event_data.clone())
212                    .ok()
213                    .map(|e| (e, StreamPosition::new(entry.event_id)))
214            })
215            .collect();
216
217        Ok(events)
218    }
219}
220
221impl CheckpointStore for InMemoryEventStore {
222    type Error = InMemoryCheckpointError;
223
224    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
225        let data = self.data.lock().map_err(|e| InMemoryCheckpointError {
226            message: format!("failed to acquire lock: {}", e),
227        })?;
228        Ok(data.checkpoints.get(name).copied())
229    }
230
231    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
232        let mut data = self.data.lock().map_err(|e| InMemoryCheckpointError {
233            message: format!("failed to acquire lock: {}", e),
234        })?;
235        let _ = data.checkpoints.insert(name.to_string(), position);
236        Ok(())
237    }
238}
239
240impl ProjectorCoordinator for InMemoryEventStore {
241    type Error = InMemoryCoordinationError;
242    type Guard = InMemoryCoordinationGuard;
243
244    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
245        let data = self
246            .data
247            .lock()
248            .map_err(|e| InMemoryCoordinationError::LockPoisoned {
249                message: e.to_string(),
250            })?;
251
252        let mut guard =
253            data.locks
254                .write()
255                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
256                    message: e.to_string(),
257                })?;
258
259        if guard.contains_key(subscription_name) {
260            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
261                subscription_name: subscription_name.to_string(),
262            });
263        }
264
265        let _ = guard.insert(subscription_name.to_string(), ());
266
267        Ok(InMemoryCoordinationGuard {
268            subscription_name: subscription_name.to_string(),
269            locks: Arc::clone(&data.locks),
270        })
271    }
272}
273
274/// In-memory checkpoint store for tracking projection progress.
275///
276/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
277/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
278/// and single-process deployments where persistence across restarts is not required.
279///
280/// For production deployments requiring durability, use a persistent
281/// checkpoint store implementation.
282///
283/// # Example
284///
285/// ```ignore
286/// use eventcore_memory::InMemoryCheckpointStore;
287///
288/// let checkpoint_store = InMemoryCheckpointStore::new();
289/// // Use with ProjectionRunner
290/// ```
291#[derive(Debug, Clone, Default)]
292pub struct InMemoryCheckpointStore {
293    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
294}
295
296impl InMemoryCheckpointStore {
297    /// Create a new in-memory checkpoint store.
298    pub fn new() -> Self {
299        Self::default()
300    }
301}
302
303/// Error type for in-memory checkpoint store operations.
304///
305/// Since the in-memory store uses an `RwLock`, the only possible error
306/// is a poisoned lock from a panic in another thread.
307#[derive(Debug, Clone)]
308pub struct InMemoryCheckpointError {
309    message: String,
310}
311
312impl std::fmt::Display for InMemoryCheckpointError {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        write!(f, "{}", self.message)
315    }
316}
317
318impl std::error::Error for InMemoryCheckpointError {}
319
320/// Error type for in-memory coordinator operations.
321#[derive(Debug, Clone)]
322pub enum InMemoryCoordinationError {
323    /// Leadership is already held by another instance.
324    LeadershipNotAcquired { subscription_name: String },
325    /// Lock was poisoned by a panic in another thread.
326    LockPoisoned { message: String },
327}
328
329impl std::fmt::Display for InMemoryCoordinationError {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        match self {
332            Self::LeadershipNotAcquired { subscription_name } => {
333                write!(
334                    f,
335                    "leadership not acquired for subscription: {}",
336                    subscription_name
337                )
338            }
339            Self::LockPoisoned { message } => write!(f, "lock poisoned: {}", message),
340        }
341    }
342}
343
344impl std::error::Error for InMemoryCoordinationError {}
345
346/// Guard that releases leadership when dropped.
347#[derive(Debug)]
348pub struct InMemoryCoordinationGuard {
349    subscription_name: String,
350    locks: Arc<RwLock<HashMap<String, ()>>>,
351}
352
353impl Drop for InMemoryCoordinationGuard {
354    fn drop(&mut self) {
355        if let Ok(mut guard) = self.locks.write() {
356            let _ = guard.remove(&self.subscription_name);
357        } else {
358            tracing::error!(
359                subscription_name = %self.subscription_name,
360                "failed to release coordination lock: RwLock poisoned"
361            );
362        }
363    }
364}
365
366/// In-memory projector coordinator for single-process deployments.
367///
368/// `InMemoryProjectorCoordinator` provides coordination for projectors within a single
369/// process using an in-memory lock table. This is suitable for testing and single-process
370/// deployments where distributed coordination is not required.
371///
372/// For distributed deployments with multiple process instances, use a database-backed
373/// coordinator implementation (e.g., PostgreSQL advisory locks).
374#[derive(Debug, Clone, Default)]
375pub struct InMemoryProjectorCoordinator {
376    locks: Arc<RwLock<HashMap<String, ()>>>,
377}
378
379impl InMemoryProjectorCoordinator {
380    /// Create a new in-memory projector coordinator.
381    pub fn new() -> Self {
382        Self::default()
383    }
384}
385
386impl ProjectorCoordinator for InMemoryProjectorCoordinator {
387    type Error = InMemoryCoordinationError;
388    type Guard = InMemoryCoordinationGuard;
389
390    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
391        let mut guard =
392            self.locks
393                .write()
394                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
395                    message: e.to_string(),
396                })?;
397
398        if guard.contains_key(subscription_name) {
399            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
400                subscription_name: subscription_name.to_string(),
401            });
402        }
403
404        let _ = guard.insert(subscription_name.to_string(), ());
405
406        Ok(InMemoryCoordinationGuard {
407            subscription_name: subscription_name.to_string(),
408            locks: Arc::clone(&self.locks),
409        })
410    }
411}
412
413impl CheckpointStore for InMemoryCheckpointStore {
414    type Error = InMemoryCheckpointError;
415
416    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
417        let guard = self
418            .checkpoints
419            .read()
420            .map_err(|e| InMemoryCheckpointError {
421                message: format!("failed to acquire read lock: {}", e),
422            })?;
423        Ok(guard.get(name).copied())
424    }
425
426    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
427        let mut guard = self
428            .checkpoints
429            .write()
430            .map_err(|e| InMemoryCheckpointError {
431                message: format!("failed to acquire write lock: {}", e),
432            })?;
433        let _ = guard.insert(name.to_string(), position);
434        Ok(())
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use eventcore_types::{BatchSize, EventFilter, EventPage};
442    use serde::{Deserialize, Serialize};
443
444    /// Test-specific domain event type for unit testing storage operations.
445    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
446    struct TestEvent {
447        stream_id: StreamId,
448        data: String,
449    }
450
451    impl Event for TestEvent {
452        fn stream_id(&self) -> &StreamId {
453            &self.stream_id
454        }
455    }
456
457    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
458    ///
459    /// This test verifies the fundamental event storage capability:
460    /// - Append an event to a stream
461    /// - Read the stream back
462    /// - Verify the event is retrievable with correct data
463    ///
464    /// This is a unit test drilling down from the failing integration test
465    /// test_deposit_command_event_data_is_retrievable. We're testing the
466    /// storage layer in isolation before testing the full command execution flow.
467    #[tokio::test]
468    async fn test_append_and_read_single_event() {
469        // Given: An in-memory event store
470        let store = InMemoryEventStore::new();
471
472        // And: A stream ID
473        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
474
475        // And: A domain event to store
476        let event = TestEvent {
477            stream_id: stream_id.clone(),
478            data: "test event data".to_string(),
479        };
480
481        // And: A collection of writes containing the event (expected version 0 for empty stream)
482        let writes = StreamWrites::new()
483            .register_stream(stream_id.clone(), StreamVersion::new(0))
484            .and_then(|writes| writes.append(event.clone()))
485            .expect("append should succeed");
486
487        // When: We append the event to the store
488        let _ = store
489            .append_events(writes)
490            .await
491            .expect("append to succeed");
492
493        let reader = store
494            .read_stream::<TestEvent>(stream_id)
495            .await
496            .expect("read to succeed");
497
498        let observed = (
499            reader.is_empty(),
500            reader.len(),
501            reader.iter().next().is_none(),
502        );
503
504        assert_eq!(observed, (false, 1usize, false));
505    }
506
507    #[tokio::test]
508    async fn event_stream_reader_is_empty_reflects_stream_population() {
509        let store = InMemoryEventStore::new();
510        let stream_id =
511            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
512
513        let initial_reader = store
514            .read_stream::<TestEvent>(stream_id.clone())
515            .await
516            .expect("initial read to succeed");
517
518        let event = TestEvent {
519            stream_id: stream_id.clone(),
520            data: "populated event".to_string(),
521        };
522
523        let writes = StreamWrites::new()
524            .register_stream(stream_id.clone(), StreamVersion::new(0))
525            .and_then(|writes| writes.append(event))
526            .expect("append should succeed");
527
528        let _ = store
529            .append_events(writes)
530            .await
531            .expect("append to succeed");
532
533        let populated_reader = store
534            .read_stream::<TestEvent>(stream_id)
535            .await
536            .expect("populated read to succeed");
537
538        let observed = (
539            initial_reader.is_empty(),
540            initial_reader.len(),
541            populated_reader.is_empty(),
542            populated_reader.len(),
543        );
544
545        assert_eq!(observed, (true, 0usize, false, 1usize));
546    }
547
548    #[tokio::test]
549    async fn read_stream_iterates_through_events_in_order() {
550        let store = InMemoryEventStore::new();
551        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
552
553        let first_event = TestEvent {
554            stream_id: stream_id.clone(),
555            data: "first".to_string(),
556        };
557
558        let second_event = TestEvent {
559            stream_id: stream_id.clone(),
560            data: "second".to_string(),
561        };
562
563        let writes = StreamWrites::new()
564            .register_stream(stream_id.clone(), StreamVersion::new(0))
565            .and_then(|writes| writes.append(first_event))
566            .and_then(|writes| writes.append(second_event))
567            .expect("append chain should succeed");
568
569        let _ = store
570            .append_events(writes)
571            .await
572            .expect("append to succeed");
573
574        let reader = store
575            .read_stream::<TestEvent>(stream_id)
576            .await
577            .expect("read to succeed");
578
579        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
580
581        let observed = (reader.is_empty(), collected);
582
583        assert_eq!(
584            observed,
585            (false, vec!["first".to_string(), "second".to_string()])
586        );
587    }
588
589    #[test]
590    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
591        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
592            .expect("valid stream id");
593
594        let first_event = TestEvent {
595            stream_id: stream_id.clone(),
596            data: "first-event".to_string(),
597        };
598
599        let second_event = TestEvent {
600            stream_id: stream_id.clone(),
601            data: "second-event".to_string(),
602        };
603
604        let writes_result = StreamWrites::new()
605            .register_stream(stream_id.clone(), StreamVersion::new(0))
606            .and_then(|writes| writes.append(first_event))
607            .and_then(|writes| writes.append(second_event));
608
609        assert!(writes_result.is_ok());
610    }
611
612    #[test]
613    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
614        let stream_id =
615            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
616
617        let first_event = TestEvent {
618            stream_id: stream_id.clone(),
619            data: "first-event-conflict".to_string(),
620        };
621
622        let second_event = TestEvent {
623            stream_id: stream_id.clone(),
624            data: "second-event-conflict".to_string(),
625        };
626
627        let conflict = StreamWrites::new()
628            .register_stream(stream_id.clone(), StreamVersion::new(0))
629            .and_then(|writes| writes.append(first_event))
630            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
631            .and_then(|writes| writes.append(second_event));
632
633        let message = conflict.unwrap_err().to_string();
634
635        assert_eq!(
636            message,
637            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
638        );
639    }
640
641    #[tokio::test]
642    async fn stream_writes_registers_stream_before_appending_multiple_events() {
643        let store = InMemoryEventStore::new();
644        let stream_id =
645            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
646
647        let first_event = TestEvent {
648            stream_id: stream_id.clone(),
649            data: "first-registered-event".to_string(),
650        };
651
652        let second_event = TestEvent {
653            stream_id: stream_id.clone(),
654            data: "second-registered-event".to_string(),
655        };
656
657        let writes = StreamWrites::new()
658            .register_stream(stream_id.clone(), StreamVersion::new(0))
659            .and_then(|writes| writes.append(first_event))
660            .and_then(|writes| writes.append(second_event))
661            .expect("registered stream should accept events");
662
663        let result = store.append_events(writes).await;
664
665        assert!(
666            result.is_ok(),
667            "append should succeed when stream registered before events"
668        );
669    }
670
671    #[test]
672    fn stream_writes_rejects_appends_for_unregistered_streams() {
673        let stream_id =
674            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
675
676        let event = TestEvent {
677            stream_id: stream_id.clone(),
678            data: "unregistered-event".to_string(),
679        };
680
681        let error = StreamWrites::new()
682            .append(event)
683            .expect_err("append without prior registration should fail");
684
685        assert!(matches!(
686            error,
687            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
688        ));
689    }
690
691    #[test]
692    fn expected_versions_returns_registered_streams_and_versions() {
693        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
694        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
695
696        let writes = StreamWrites::new()
697            .register_stream(stream_a.clone(), StreamVersion::new(0))
698            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
699            .expect("registration should succeed");
700
701        let versions = writes.expected_versions();
702
703        assert_eq!(versions.len(), 2);
704        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
705        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
706    }
707
708    #[test]
709    fn stream_id_rejects_asterisk_metacharacter() {
710        let result = StreamId::try_new("account-*");
711        assert!(
712            result.is_err(),
713            "StreamId should reject asterisk glob metacharacter"
714        );
715    }
716
717    #[test]
718    fn stream_id_rejects_question_mark_metacharacter() {
719        let result = StreamId::try_new("account-?");
720        assert!(
721            result.is_err(),
722            "StreamId should reject question mark glob metacharacter"
723        );
724    }
725
726    #[test]
727    fn stream_id_rejects_open_bracket_metacharacter() {
728        let result = StreamId::try_new("account-[");
729        assert!(
730            result.is_err(),
731            "StreamId should reject open bracket glob metacharacter"
732        );
733    }
734
735    #[test]
736    fn stream_id_rejects_close_bracket_metacharacter() {
737        let result = StreamId::try_new("account-]");
738        assert!(
739            result.is_err(),
740            "StreamId should reject close bracket glob metacharacter"
741        );
742    }
743
744    #[tokio::test]
745    async fn event_reader_after_position_excludes_event_at_position() {
746        // Given: An event store with 3 events
747        let store = InMemoryEventStore::new();
748        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
749
750        let event1 = TestEvent {
751            stream_id: stream_id.clone(),
752            data: "first".to_string(),
753        };
754        let event2 = TestEvent {
755            stream_id: stream_id.clone(),
756            data: "second".to_string(),
757        };
758        let event3 = TestEvent {
759            stream_id: stream_id.clone(),
760            data: "third".to_string(),
761        };
762
763        let writes = StreamWrites::new()
764            .register_stream(stream_id.clone(), StreamVersion::new(0))
765            .and_then(|w| w.append(event1))
766            .and_then(|w| w.append(event2))
767            .and_then(|w| w.append(event3))
768            .expect("append should succeed");
769
770        store
771            .append_events(writes)
772            .await
773            .expect("append to succeed");
774
775        // First, read all events to get their positions
776        let all_events = store
777            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
778            .await
779            .expect("read all events to succeed");
780
781        assert_eq!(all_events.len(), 3, "Should have 3 events total");
782        let (first_event, first_position) = &all_events[0];
783
784        // When: We read events after the first event's position
785        let page = EventPage::after(*first_position, BatchSize::new(100));
786        let filter = EventFilter::all();
787        let events = store
788            .read_events::<TestEvent>(filter, page)
789            .await
790            .expect("read to succeed");
791
792        // Then: We should get 2 events (event2 and event3), not including event1
793        assert_eq!(events.len(), 2, "Should get 2 events after first position");
794        assert_eq!(
795            events[0].0.data, "second",
796            "First returned event should be 'second'"
797        );
798        assert_eq!(
799            events[1].0.data, "third",
800            "Second returned event should be 'third'"
801        );
802
803        // And: The first event should NOT be in the results
804        for (event, _pos) in &events {
805            assert_ne!(
806                event.data, first_event.data,
807                "First event should be excluded"
808            );
809        }
810
811        // And: All returned positions should be greater than first_position
812        for (_event, pos) in &events {
813            assert!(
814                *pos > *first_position,
815                "Returned position {} should be > first position {}",
816                pos,
817                first_position
818            );
819        }
820    }
821
822    #[tokio::test]
823    async fn in_memory_event_store_implements_checkpoint_store() {
824        // Given: An InMemoryEventStore
825        let store = InMemoryEventStore::new();
826
827        // When: We save a checkpoint
828        let position = StreamPosition::new(Uuid::now_v7());
829        CheckpointStore::save(&store, "test-projector", position)
830            .await
831            .expect("save should succeed");
832
833        // Then: We can load it back
834        let loaded = CheckpointStore::load(&store, "test-projector")
835            .await
836            .expect("load should succeed");
837        assert_eq!(loaded, Some(position));
838    }
839
840    #[tokio::test]
841    async fn in_memory_event_store_implements_projector_coordinator() {
842        // Given: An InMemoryEventStore
843        let store = InMemoryEventStore::new();
844
845        // When: We try to acquire leadership
846        let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
847
848        // Then: It should succeed
849        assert!(guard.is_ok(), "should acquire leadership");
850    }
851}