1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12 StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27 event_id: Uuid,
29 stream_id: String,
31 event_data: serde_json::Value,
33}
34
35struct StoreData {
37 streams: HashMap<StreamId, StreamData>,
38 global_log: Vec<GlobalLogEntry>,
40 checkpoints: HashMap<String, StreamPosition>,
42 locks: Arc<RwLock<HashMap<String, ()>>>,
44}
45
46pub struct InMemoryEventStore {
67 data: std::sync::Mutex<StoreData>,
68}
69
70impl InMemoryEventStore {
71 pub fn new() -> Self {
76 Self {
77 data: std::sync::Mutex::new(StoreData {
78 streams: HashMap::new(),
79 global_log: Vec::new(),
80 checkpoints: HashMap::new(),
81 locks: Arc::new(RwLock::new(HashMap::new())),
82 }),
83 }
84 }
85}
86
87impl Default for InMemoryEventStore {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl EventStore for InMemoryEventStore {
94 async fn read_stream<E: Event>(
95 &self,
96 stream_id: StreamId,
97 ) -> Result<EventStreamReader<E>, EventStoreError> {
98 let data = self
99 .data
100 .lock()
101 .map_err(|_| EventStoreError::StoreFailure {
102 operation: Operation::ReadStream,
103 })?;
104 let events = data
105 .streams
106 .get(&stream_id)
107 .map(|(boxed_events, _version)| {
108 boxed_events
109 .iter()
110 .filter_map(|boxed| boxed.downcast_ref::<E>())
111 .cloned()
112 .collect()
113 })
114 .unwrap_or_default();
115
116 Ok(EventStreamReader::new(events))
117 }
118
119 async fn append_events(
120 &self,
121 writes: StreamWrites,
122 ) -> Result<EventStreamSlice, EventStoreError> {
123 let mut data = self
124 .data
125 .lock()
126 .map_err(|_| EventStoreError::StoreFailure {
127 operation: Operation::AppendEvents,
128 })?;
129 let expected_versions = writes.expected_versions().clone();
130
131 for (stream_id, expected_version) in &expected_versions {
133 let current_version = data
134 .streams
135 .get(stream_id)
136 .map(|(_events, version)| *version)
137 .unwrap_or_else(|| StreamVersion::new(0));
138
139 if current_version != *expected_version {
140 return Err(EventStoreError::VersionConflict);
141 }
142 }
143
144 for entry in writes.into_entries() {
146 let StreamWriteEntry {
147 stream_id,
148 event,
149 event_type: _,
150 event_data,
151 } = entry;
152
153 let event_id = Uuid::now_v7();
155
156 data.global_log.push(GlobalLogEntry {
158 event_id,
159 stream_id: stream_id.as_ref().to_string(),
160 event_data,
161 });
162
163 let (events, version) = data
164 .streams
165 .entry(stream_id)
166 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
167 events.push(event);
168 *version = version.increment();
169 }
170
171 Ok(EventStreamSlice)
172 }
173}
174
175impl EventReader for InMemoryEventStore {
176 type Error = EventStoreError;
177
178 async fn read_events<E: Event>(
179 &self,
180 filter: EventFilter,
181 page: EventPage,
182 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
183 let data = self
184 .data
185 .lock()
186 .map_err(|_| EventStoreError::StoreFailure {
187 operation: Operation::ReadStream,
188 })?;
189
190 let after_event_id = page.after_position().map(|p| p.into_inner());
191
192 let events: Vec<(E, StreamPosition)> = data
193 .global_log
194 .iter()
195 .filter(|entry| {
196 match after_event_id {
198 None => true,
199 Some(after_id) => entry.event_id > after_id,
200 }
201 })
202 .filter(|entry| {
203 match filter.stream_prefix() {
205 None => true,
206 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
207 }
208 })
209 .take(page.limit().into_inner())
210 .filter_map(|entry| {
211 serde_json::from_value::<E>(entry.event_data.clone())
212 .ok()
213 .map(|e| (e, StreamPosition::new(entry.event_id)))
214 })
215 .collect();
216
217 Ok(events)
218 }
219}
220
221impl CheckpointStore for InMemoryEventStore {
222 type Error = InMemoryCheckpointError;
223
224 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
225 let data = self.data.lock().map_err(|e| InMemoryCheckpointError {
226 message: format!("failed to acquire lock: {}", e),
227 })?;
228 Ok(data.checkpoints.get(name).copied())
229 }
230
231 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
232 let mut data = self.data.lock().map_err(|e| InMemoryCheckpointError {
233 message: format!("failed to acquire lock: {}", e),
234 })?;
235 let _ = data.checkpoints.insert(name.to_string(), position);
236 Ok(())
237 }
238}
239
240impl ProjectorCoordinator for InMemoryEventStore {
241 type Error = InMemoryCoordinationError;
242 type Guard = InMemoryCoordinationGuard;
243
244 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
245 let data = self
246 .data
247 .lock()
248 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
249 message: e.to_string(),
250 })?;
251
252 let mut guard =
253 data.locks
254 .write()
255 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
256 message: e.to_string(),
257 })?;
258
259 if guard.contains_key(subscription_name) {
260 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
261 subscription_name: subscription_name.to_string(),
262 });
263 }
264
265 let _ = guard.insert(subscription_name.to_string(), ());
266
267 Ok(InMemoryCoordinationGuard {
268 subscription_name: subscription_name.to_string(),
269 locks: Arc::clone(&data.locks),
270 })
271 }
272}
273
274#[derive(Debug, Clone, Default)]
292pub struct InMemoryCheckpointStore {
293 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
294}
295
296impl InMemoryCheckpointStore {
297 pub fn new() -> Self {
299 Self::default()
300 }
301}
302
303#[derive(Debug, Clone)]
308pub struct InMemoryCheckpointError {
309 message: String,
310}
311
312impl std::fmt::Display for InMemoryCheckpointError {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 write!(f, "{}", self.message)
315 }
316}
317
318impl std::error::Error for InMemoryCheckpointError {}
319
320#[derive(Debug, Clone)]
322pub enum InMemoryCoordinationError {
323 LeadershipNotAcquired { subscription_name: String },
325 LockPoisoned { message: String },
327}
328
329impl std::fmt::Display for InMemoryCoordinationError {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 match self {
332 Self::LeadershipNotAcquired { subscription_name } => {
333 write!(
334 f,
335 "leadership not acquired for subscription: {}",
336 subscription_name
337 )
338 }
339 Self::LockPoisoned { message } => write!(f, "lock poisoned: {}", message),
340 }
341 }
342}
343
344impl std::error::Error for InMemoryCoordinationError {}
345
346#[derive(Debug)]
348pub struct InMemoryCoordinationGuard {
349 subscription_name: String,
350 locks: Arc<RwLock<HashMap<String, ()>>>,
351}
352
353impl Drop for InMemoryCoordinationGuard {
354 fn drop(&mut self) {
355 if let Ok(mut guard) = self.locks.write() {
356 let _ = guard.remove(&self.subscription_name);
357 } else {
358 tracing::error!(
359 subscription_name = %self.subscription_name,
360 "failed to release coordination lock: RwLock poisoned"
361 );
362 }
363 }
364}
365
366#[derive(Debug, Clone, Default)]
375pub struct InMemoryProjectorCoordinator {
376 locks: Arc<RwLock<HashMap<String, ()>>>,
377}
378
379impl InMemoryProjectorCoordinator {
380 pub fn new() -> Self {
382 Self::default()
383 }
384}
385
386impl ProjectorCoordinator for InMemoryProjectorCoordinator {
387 type Error = InMemoryCoordinationError;
388 type Guard = InMemoryCoordinationGuard;
389
390 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
391 let mut guard =
392 self.locks
393 .write()
394 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
395 message: e.to_string(),
396 })?;
397
398 if guard.contains_key(subscription_name) {
399 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
400 subscription_name: subscription_name.to_string(),
401 });
402 }
403
404 let _ = guard.insert(subscription_name.to_string(), ());
405
406 Ok(InMemoryCoordinationGuard {
407 subscription_name: subscription_name.to_string(),
408 locks: Arc::clone(&self.locks),
409 })
410 }
411}
412
413impl CheckpointStore for InMemoryCheckpointStore {
414 type Error = InMemoryCheckpointError;
415
416 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
417 let guard = self
418 .checkpoints
419 .read()
420 .map_err(|e| InMemoryCheckpointError {
421 message: format!("failed to acquire read lock: {}", e),
422 })?;
423 Ok(guard.get(name).copied())
424 }
425
426 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
427 let mut guard = self
428 .checkpoints
429 .write()
430 .map_err(|e| InMemoryCheckpointError {
431 message: format!("failed to acquire write lock: {}", e),
432 })?;
433 let _ = guard.insert(name.to_string(), position);
434 Ok(())
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use eventcore_types::{BatchSize, EventFilter, EventPage};
442 use serde::{Deserialize, Serialize};
443
444 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
446 struct TestEvent {
447 stream_id: StreamId,
448 data: String,
449 }
450
451 impl Event for TestEvent {
452 fn stream_id(&self) -> &StreamId {
453 &self.stream_id
454 }
455 }
456
457 #[tokio::test]
468 async fn test_append_and_read_single_event() {
469 let store = InMemoryEventStore::new();
471
472 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
474
475 let event = TestEvent {
477 stream_id: stream_id.clone(),
478 data: "test event data".to_string(),
479 };
480
481 let writes = StreamWrites::new()
483 .register_stream(stream_id.clone(), StreamVersion::new(0))
484 .and_then(|writes| writes.append(event.clone()))
485 .expect("append should succeed");
486
487 let _ = store
489 .append_events(writes)
490 .await
491 .expect("append to succeed");
492
493 let reader = store
494 .read_stream::<TestEvent>(stream_id)
495 .await
496 .expect("read to succeed");
497
498 let observed = (
499 reader.is_empty(),
500 reader.len(),
501 reader.iter().next().is_none(),
502 );
503
504 assert_eq!(observed, (false, 1usize, false));
505 }
506
507 #[tokio::test]
508 async fn event_stream_reader_is_empty_reflects_stream_population() {
509 let store = InMemoryEventStore::new();
510 let stream_id =
511 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
512
513 let initial_reader = store
514 .read_stream::<TestEvent>(stream_id.clone())
515 .await
516 .expect("initial read to succeed");
517
518 let event = TestEvent {
519 stream_id: stream_id.clone(),
520 data: "populated event".to_string(),
521 };
522
523 let writes = StreamWrites::new()
524 .register_stream(stream_id.clone(), StreamVersion::new(0))
525 .and_then(|writes| writes.append(event))
526 .expect("append should succeed");
527
528 let _ = store
529 .append_events(writes)
530 .await
531 .expect("append to succeed");
532
533 let populated_reader = store
534 .read_stream::<TestEvent>(stream_id)
535 .await
536 .expect("populated read to succeed");
537
538 let observed = (
539 initial_reader.is_empty(),
540 initial_reader.len(),
541 populated_reader.is_empty(),
542 populated_reader.len(),
543 );
544
545 assert_eq!(observed, (true, 0usize, false, 1usize));
546 }
547
548 #[tokio::test]
549 async fn read_stream_iterates_through_events_in_order() {
550 let store = InMemoryEventStore::new();
551 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
552
553 let first_event = TestEvent {
554 stream_id: stream_id.clone(),
555 data: "first".to_string(),
556 };
557
558 let second_event = TestEvent {
559 stream_id: stream_id.clone(),
560 data: "second".to_string(),
561 };
562
563 let writes = StreamWrites::new()
564 .register_stream(stream_id.clone(), StreamVersion::new(0))
565 .and_then(|writes| writes.append(first_event))
566 .and_then(|writes| writes.append(second_event))
567 .expect("append chain should succeed");
568
569 let _ = store
570 .append_events(writes)
571 .await
572 .expect("append to succeed");
573
574 let reader = store
575 .read_stream::<TestEvent>(stream_id)
576 .await
577 .expect("read to succeed");
578
579 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
580
581 let observed = (reader.is_empty(), collected);
582
583 assert_eq!(
584 observed,
585 (false, vec!["first".to_string(), "second".to_string()])
586 );
587 }
588
589 #[test]
590 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
591 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
592 .expect("valid stream id");
593
594 let first_event = TestEvent {
595 stream_id: stream_id.clone(),
596 data: "first-event".to_string(),
597 };
598
599 let second_event = TestEvent {
600 stream_id: stream_id.clone(),
601 data: "second-event".to_string(),
602 };
603
604 let writes_result = StreamWrites::new()
605 .register_stream(stream_id.clone(), StreamVersion::new(0))
606 .and_then(|writes| writes.append(first_event))
607 .and_then(|writes| writes.append(second_event));
608
609 assert!(writes_result.is_ok());
610 }
611
612 #[test]
613 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
614 let stream_id =
615 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
616
617 let first_event = TestEvent {
618 stream_id: stream_id.clone(),
619 data: "first-event-conflict".to_string(),
620 };
621
622 let second_event = TestEvent {
623 stream_id: stream_id.clone(),
624 data: "second-event-conflict".to_string(),
625 };
626
627 let conflict = StreamWrites::new()
628 .register_stream(stream_id.clone(), StreamVersion::new(0))
629 .and_then(|writes| writes.append(first_event))
630 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
631 .and_then(|writes| writes.append(second_event));
632
633 let message = conflict.unwrap_err().to_string();
634
635 assert_eq!(
636 message,
637 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
638 );
639 }
640
641 #[tokio::test]
642 async fn stream_writes_registers_stream_before_appending_multiple_events() {
643 let store = InMemoryEventStore::new();
644 let stream_id =
645 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
646
647 let first_event = TestEvent {
648 stream_id: stream_id.clone(),
649 data: "first-registered-event".to_string(),
650 };
651
652 let second_event = TestEvent {
653 stream_id: stream_id.clone(),
654 data: "second-registered-event".to_string(),
655 };
656
657 let writes = StreamWrites::new()
658 .register_stream(stream_id.clone(), StreamVersion::new(0))
659 .and_then(|writes| writes.append(first_event))
660 .and_then(|writes| writes.append(second_event))
661 .expect("registered stream should accept events");
662
663 let result = store.append_events(writes).await;
664
665 assert!(
666 result.is_ok(),
667 "append should succeed when stream registered before events"
668 );
669 }
670
671 #[test]
672 fn stream_writes_rejects_appends_for_unregistered_streams() {
673 let stream_id =
674 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
675
676 let event = TestEvent {
677 stream_id: stream_id.clone(),
678 data: "unregistered-event".to_string(),
679 };
680
681 let error = StreamWrites::new()
682 .append(event)
683 .expect_err("append without prior registration should fail");
684
685 assert!(matches!(
686 error,
687 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
688 ));
689 }
690
691 #[test]
692 fn expected_versions_returns_registered_streams_and_versions() {
693 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
694 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
695
696 let writes = StreamWrites::new()
697 .register_stream(stream_a.clone(), StreamVersion::new(0))
698 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
699 .expect("registration should succeed");
700
701 let versions = writes.expected_versions();
702
703 assert_eq!(versions.len(), 2);
704 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
705 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
706 }
707
708 #[test]
709 fn stream_id_rejects_asterisk_metacharacter() {
710 let result = StreamId::try_new("account-*");
711 assert!(
712 result.is_err(),
713 "StreamId should reject asterisk glob metacharacter"
714 );
715 }
716
717 #[test]
718 fn stream_id_rejects_question_mark_metacharacter() {
719 let result = StreamId::try_new("account-?");
720 assert!(
721 result.is_err(),
722 "StreamId should reject question mark glob metacharacter"
723 );
724 }
725
726 #[test]
727 fn stream_id_rejects_open_bracket_metacharacter() {
728 let result = StreamId::try_new("account-[");
729 assert!(
730 result.is_err(),
731 "StreamId should reject open bracket glob metacharacter"
732 );
733 }
734
735 #[test]
736 fn stream_id_rejects_close_bracket_metacharacter() {
737 let result = StreamId::try_new("account-]");
738 assert!(
739 result.is_err(),
740 "StreamId should reject close bracket glob metacharacter"
741 );
742 }
743
744 #[tokio::test]
745 async fn event_reader_after_position_excludes_event_at_position() {
746 let store = InMemoryEventStore::new();
748 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
749
750 let event1 = TestEvent {
751 stream_id: stream_id.clone(),
752 data: "first".to_string(),
753 };
754 let event2 = TestEvent {
755 stream_id: stream_id.clone(),
756 data: "second".to_string(),
757 };
758 let event3 = TestEvent {
759 stream_id: stream_id.clone(),
760 data: "third".to_string(),
761 };
762
763 let writes = StreamWrites::new()
764 .register_stream(stream_id.clone(), StreamVersion::new(0))
765 .and_then(|w| w.append(event1))
766 .and_then(|w| w.append(event2))
767 .and_then(|w| w.append(event3))
768 .expect("append should succeed");
769
770 store
771 .append_events(writes)
772 .await
773 .expect("append to succeed");
774
775 let all_events = store
777 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
778 .await
779 .expect("read all events to succeed");
780
781 assert_eq!(all_events.len(), 3, "Should have 3 events total");
782 let (first_event, first_position) = &all_events[0];
783
784 let page = EventPage::after(*first_position, BatchSize::new(100));
786 let filter = EventFilter::all();
787 let events = store
788 .read_events::<TestEvent>(filter, page)
789 .await
790 .expect("read to succeed");
791
792 assert_eq!(events.len(), 2, "Should get 2 events after first position");
794 assert_eq!(
795 events[0].0.data, "second",
796 "First returned event should be 'second'"
797 );
798 assert_eq!(
799 events[1].0.data, "third",
800 "Second returned event should be 'third'"
801 );
802
803 for (event, _pos) in &events {
805 assert_ne!(
806 event.data, first_event.data,
807 "First event should be excluded"
808 );
809 }
810
811 for (_event, pos) in &events {
813 assert!(
814 *pos > *first_position,
815 "Returned position {} should be > first position {}",
816 pos,
817 first_position
818 );
819 }
820 }
821
822 #[tokio::test]
823 async fn in_memory_event_store_implements_checkpoint_store() {
824 let store = InMemoryEventStore::new();
826
827 let position = StreamPosition::new(Uuid::now_v7());
829 CheckpointStore::save(&store, "test-projector", position)
830 .await
831 .expect("save should succeed");
832
833 let loaded = CheckpointStore::load(&store, "test-projector")
835 .await
836 .expect("load should succeed");
837 assert_eq!(loaded, Some(position));
838 }
839
840 #[tokio::test]
841 async fn in_memory_event_store_implements_projector_coordinator() {
842 let store = InMemoryEventStore::new();
844
845 let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
847
848 assert!(guard.is_ok(), "should acquire leadership");
850 }
851}