1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12 StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
36
37#[derive(Debug, Clone)]
45struct GlobalLogEntry {
46 event_id: Uuid,
48 stream_id: String,
50 event_data: serde_json::Value,
52}
53
54struct StoreData {
56 streams: HashMap<StreamId, StreamData>,
57 global_log: Vec<GlobalLogEntry>,
59 checkpoints: HashMap<String, StreamPosition>,
61 locks: Arc<RwLock<HashMap<String, ()>>>,
63}
64
65pub struct InMemoryEventStore {
66 data: std::sync::Mutex<StoreData>,
67}
68
69impl InMemoryEventStore {
70 pub fn new() -> Self {
75 Self {
76 data: std::sync::Mutex::new(StoreData {
77 streams: HashMap::new(),
78 global_log: Vec::new(),
79 checkpoints: HashMap::new(),
80 locks: Arc::new(RwLock::new(HashMap::new())),
81 }),
82 }
83 }
84}
85
86impl Default for InMemoryEventStore {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl EventStore for InMemoryEventStore {
93 async fn read_stream<E: Event>(
94 &self,
95 stream_id: StreamId,
96 ) -> Result<EventStreamReader<E>, EventStoreError> {
97 let data = self
98 .data
99 .lock()
100 .map_err(|_| EventStoreError::StoreFailure {
101 operation: Operation::ReadStream,
102 })?;
103 let events = data
104 .streams
105 .get(&stream_id)
106 .map(|(boxed_events, _version)| {
107 boxed_events
108 .iter()
109 .filter_map(|boxed| boxed.downcast_ref::<E>())
110 .cloned()
111 .collect()
112 })
113 .unwrap_or_default();
114
115 Ok(EventStreamReader::new(events))
116 }
117
118 async fn append_events(
119 &self,
120 writes: StreamWrites,
121 ) -> Result<EventStreamSlice, EventStoreError> {
122 let mut data = self
123 .data
124 .lock()
125 .map_err(|_| EventStoreError::StoreFailure {
126 operation: Operation::AppendEvents,
127 })?;
128 let expected_versions = writes.expected_versions().clone();
129
130 for (stream_id, expected_version) in &expected_versions {
132 let current_version = data
133 .streams
134 .get(stream_id)
135 .map(|(_events, version)| *version)
136 .unwrap_or_else(|| StreamVersion::new(0));
137
138 if current_version != *expected_version {
139 return Err(EventStoreError::VersionConflict);
140 }
141 }
142
143 for entry in writes.into_entries() {
145 let StreamWriteEntry {
146 stream_id,
147 event,
148 event_type: _,
149 event_data,
150 } = entry;
151
152 let event_id = Uuid::now_v7();
154
155 data.global_log.push(GlobalLogEntry {
157 event_id,
158 stream_id: stream_id.as_ref().to_string(),
159 event_data,
160 });
161
162 let (events, version) = data
163 .streams
164 .entry(stream_id)
165 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
166 events.push(event);
167 *version = version.increment();
168 }
169
170 Ok(EventStreamSlice)
171 }
172}
173
174impl EventReader for InMemoryEventStore {
175 type Error = EventStoreError;
176
177 async fn read_events<E: Event>(
178 &self,
179 filter: EventFilter,
180 page: EventPage,
181 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
182 let data = self
183 .data
184 .lock()
185 .map_err(|_| EventStoreError::StoreFailure {
186 operation: Operation::ReadStream,
187 })?;
188
189 let after_event_id = page.after_position().map(|p| p.into_inner());
190
191 let events: Vec<(E, StreamPosition)> = data
192 .global_log
193 .iter()
194 .filter(|entry| {
195 match after_event_id {
197 None => true,
198 Some(after_id) => entry.event_id > after_id,
199 }
200 })
201 .filter(|entry| {
202 match filter.stream_prefix() {
204 None => true,
205 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
206 }
207 })
208 .take(page.limit().into_inner())
209 .filter_map(|entry| {
210 serde_json::from_value::<E>(entry.event_data.clone())
211 .ok()
212 .map(|e| (e, StreamPosition::new(entry.event_id)))
213 })
214 .collect();
215
216 Ok(events)
217 }
218}
219
220impl CheckpointStore for InMemoryEventStore {
221 type Error = InMemoryCheckpointError;
222
223 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
224 let data = self.data.lock().map_err(|e| InMemoryCheckpointError {
225 message: format!("failed to acquire lock: {}", e),
226 })?;
227 Ok(data.checkpoints.get(name).copied())
228 }
229
230 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
231 let mut data = self.data.lock().map_err(|e| InMemoryCheckpointError {
232 message: format!("failed to acquire lock: {}", e),
233 })?;
234 let _ = data.checkpoints.insert(name.to_string(), position);
235 Ok(())
236 }
237}
238
239impl ProjectorCoordinator for InMemoryEventStore {
240 type Error = InMemoryCoordinationError;
241 type Guard = InMemoryCoordinationGuard;
242
243 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
244 let data = self
245 .data
246 .lock()
247 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
248 message: e.to_string(),
249 })?;
250
251 let mut guard =
252 data.locks
253 .write()
254 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
255 message: e.to_string(),
256 })?;
257
258 if guard.contains_key(subscription_name) {
259 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
260 subscription_name: subscription_name.to_string(),
261 });
262 }
263
264 let _ = guard.insert(subscription_name.to_string(), ());
265
266 Ok(InMemoryCoordinationGuard {
267 subscription_name: subscription_name.to_string(),
268 locks: Arc::clone(&data.locks),
269 })
270 }
271}
272
273#[derive(Debug, Clone, Default)]
291pub struct InMemoryCheckpointStore {
292 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
293}
294
295impl InMemoryCheckpointStore {
296 pub fn new() -> Self {
298 Self::default()
299 }
300}
301
302#[derive(Debug, Clone)]
307pub struct InMemoryCheckpointError {
308 message: String,
309}
310
311impl std::fmt::Display for InMemoryCheckpointError {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 write!(f, "{}", self.message)
314 }
315}
316
317impl std::error::Error for InMemoryCheckpointError {}
318
319#[derive(Debug, Clone)]
321pub enum InMemoryCoordinationError {
322 LeadershipNotAcquired { subscription_name: String },
324 LockPoisoned { message: String },
326}
327
328impl std::fmt::Display for InMemoryCoordinationError {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 match self {
331 Self::LeadershipNotAcquired { subscription_name } => {
332 write!(
333 f,
334 "leadership not acquired for subscription: {}",
335 subscription_name
336 )
337 }
338 Self::LockPoisoned { message } => write!(f, "lock poisoned: {}", message),
339 }
340 }
341}
342
343impl std::error::Error for InMemoryCoordinationError {}
344
345#[derive(Debug)]
347pub struct InMemoryCoordinationGuard {
348 subscription_name: String,
349 locks: Arc<RwLock<HashMap<String, ()>>>,
350}
351
352impl Drop for InMemoryCoordinationGuard {
353 fn drop(&mut self) {
354 if let Ok(mut guard) = self.locks.write() {
355 let _ = guard.remove(&self.subscription_name);
356 }
357 }
358}
359
360#[derive(Debug, Clone, Default)]
369pub struct InMemoryProjectorCoordinator {
370 locks: Arc<RwLock<HashMap<String, ()>>>,
371}
372
373impl InMemoryProjectorCoordinator {
374 pub fn new() -> Self {
376 Self::default()
377 }
378}
379
380impl ProjectorCoordinator for InMemoryProjectorCoordinator {
381 type Error = InMemoryCoordinationError;
382 type Guard = InMemoryCoordinationGuard;
383
384 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
385 let mut guard =
386 self.locks
387 .write()
388 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
389 message: e.to_string(),
390 })?;
391
392 if guard.contains_key(subscription_name) {
393 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
394 subscription_name: subscription_name.to_string(),
395 });
396 }
397
398 let _ = guard.insert(subscription_name.to_string(), ());
399
400 Ok(InMemoryCoordinationGuard {
401 subscription_name: subscription_name.to_string(),
402 locks: Arc::clone(&self.locks),
403 })
404 }
405}
406
407impl CheckpointStore for InMemoryCheckpointStore {
408 type Error = InMemoryCheckpointError;
409
410 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
411 let guard = self
412 .checkpoints
413 .read()
414 .map_err(|e| InMemoryCheckpointError {
415 message: format!("failed to acquire read lock: {}", e),
416 })?;
417 Ok(guard.get(name).copied())
418 }
419
420 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
421 let mut guard = self
422 .checkpoints
423 .write()
424 .map_err(|e| InMemoryCheckpointError {
425 message: format!("failed to acquire write lock: {}", e),
426 })?;
427 let _ = guard.insert(name.to_string(), position);
428 Ok(())
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use eventcore_types::{BatchSize, EventFilter, EventPage};
436 use serde::{Deserialize, Serialize};
437
438 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
440 struct TestEvent {
441 stream_id: StreamId,
442 data: String,
443 }
444
445 impl Event for TestEvent {
446 fn stream_id(&self) -> &StreamId {
447 &self.stream_id
448 }
449 }
450
451 #[tokio::test]
462 async fn test_append_and_read_single_event() {
463 let store = InMemoryEventStore::new();
465
466 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
468
469 let event = TestEvent {
471 stream_id: stream_id.clone(),
472 data: "test event data".to_string(),
473 };
474
475 let writes = StreamWrites::new()
477 .register_stream(stream_id.clone(), StreamVersion::new(0))
478 .and_then(|writes| writes.append(event.clone()))
479 .expect("append should succeed");
480
481 let _ = store
483 .append_events(writes)
484 .await
485 .expect("append to succeed");
486
487 let reader = store
488 .read_stream::<TestEvent>(stream_id)
489 .await
490 .expect("read to succeed");
491
492 let observed = (
493 reader.is_empty(),
494 reader.len(),
495 reader.iter().next().is_none(),
496 );
497
498 assert_eq!(observed, (false, 1usize, false));
499 }
500
501 #[tokio::test]
502 async fn event_stream_reader_is_empty_reflects_stream_population() {
503 let store = InMemoryEventStore::new();
504 let stream_id =
505 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
506
507 let initial_reader = store
508 .read_stream::<TestEvent>(stream_id.clone())
509 .await
510 .expect("initial read to succeed");
511
512 let event = TestEvent {
513 stream_id: stream_id.clone(),
514 data: "populated event".to_string(),
515 };
516
517 let writes = StreamWrites::new()
518 .register_stream(stream_id.clone(), StreamVersion::new(0))
519 .and_then(|writes| writes.append(event))
520 .expect("append should succeed");
521
522 let _ = store
523 .append_events(writes)
524 .await
525 .expect("append to succeed");
526
527 let populated_reader = store
528 .read_stream::<TestEvent>(stream_id)
529 .await
530 .expect("populated read to succeed");
531
532 let observed = (
533 initial_reader.is_empty(),
534 initial_reader.len(),
535 populated_reader.is_empty(),
536 populated_reader.len(),
537 );
538
539 assert_eq!(observed, (true, 0usize, false, 1usize));
540 }
541
542 #[tokio::test]
543 async fn read_stream_iterates_through_events_in_order() {
544 let store = InMemoryEventStore::new();
545 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
546
547 let first_event = TestEvent {
548 stream_id: stream_id.clone(),
549 data: "first".to_string(),
550 };
551
552 let second_event = TestEvent {
553 stream_id: stream_id.clone(),
554 data: "second".to_string(),
555 };
556
557 let writes = StreamWrites::new()
558 .register_stream(stream_id.clone(), StreamVersion::new(0))
559 .and_then(|writes| writes.append(first_event))
560 .and_then(|writes| writes.append(second_event))
561 .expect("append chain should succeed");
562
563 let _ = store
564 .append_events(writes)
565 .await
566 .expect("append to succeed");
567
568 let reader = store
569 .read_stream::<TestEvent>(stream_id)
570 .await
571 .expect("read to succeed");
572
573 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
574
575 let observed = (reader.is_empty(), collected);
576
577 assert_eq!(
578 observed,
579 (false, vec!["first".to_string(), "second".to_string()])
580 );
581 }
582
583 #[test]
584 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
585 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
586 .expect("valid stream id");
587
588 let first_event = TestEvent {
589 stream_id: stream_id.clone(),
590 data: "first-event".to_string(),
591 };
592
593 let second_event = TestEvent {
594 stream_id: stream_id.clone(),
595 data: "second-event".to_string(),
596 };
597
598 let writes_result = StreamWrites::new()
599 .register_stream(stream_id.clone(), StreamVersion::new(0))
600 .and_then(|writes| writes.append(first_event))
601 .and_then(|writes| writes.append(second_event));
602
603 assert!(writes_result.is_ok());
604 }
605
606 #[test]
607 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
608 let stream_id =
609 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
610
611 let first_event = TestEvent {
612 stream_id: stream_id.clone(),
613 data: "first-event-conflict".to_string(),
614 };
615
616 let second_event = TestEvent {
617 stream_id: stream_id.clone(),
618 data: "second-event-conflict".to_string(),
619 };
620
621 let conflict = StreamWrites::new()
622 .register_stream(stream_id.clone(), StreamVersion::new(0))
623 .and_then(|writes| writes.append(first_event))
624 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
625 .and_then(|writes| writes.append(second_event));
626
627 let message = conflict.unwrap_err().to_string();
628
629 assert_eq!(
630 message,
631 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
632 );
633 }
634
635 #[tokio::test]
636 async fn stream_writes_registers_stream_before_appending_multiple_events() {
637 let store = InMemoryEventStore::new();
638 let stream_id =
639 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
640
641 let first_event = TestEvent {
642 stream_id: stream_id.clone(),
643 data: "first-registered-event".to_string(),
644 };
645
646 let second_event = TestEvent {
647 stream_id: stream_id.clone(),
648 data: "second-registered-event".to_string(),
649 };
650
651 let writes = StreamWrites::new()
652 .register_stream(stream_id.clone(), StreamVersion::new(0))
653 .and_then(|writes| writes.append(first_event))
654 .and_then(|writes| writes.append(second_event))
655 .expect("registered stream should accept events");
656
657 let result = store.append_events(writes).await;
658
659 assert!(
660 result.is_ok(),
661 "append should succeed when stream registered before events"
662 );
663 }
664
665 #[test]
666 fn stream_writes_rejects_appends_for_unregistered_streams() {
667 let stream_id =
668 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
669
670 let event = TestEvent {
671 stream_id: stream_id.clone(),
672 data: "unregistered-event".to_string(),
673 };
674
675 let error = StreamWrites::new()
676 .append(event)
677 .expect_err("append without prior registration should fail");
678
679 assert!(matches!(
680 error,
681 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
682 ));
683 }
684
685 #[test]
686 fn expected_versions_returns_registered_streams_and_versions() {
687 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
688 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
689
690 let writes = StreamWrites::new()
691 .register_stream(stream_a.clone(), StreamVersion::new(0))
692 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
693 .expect("registration should succeed");
694
695 let versions = writes.expected_versions();
696
697 assert_eq!(versions.len(), 2);
698 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
699 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
700 }
701
702 #[test]
703 fn stream_id_rejects_asterisk_metacharacter() {
704 let result = StreamId::try_new("account-*");
705 assert!(
706 result.is_err(),
707 "StreamId should reject asterisk glob metacharacter"
708 );
709 }
710
711 #[test]
712 fn stream_id_rejects_question_mark_metacharacter() {
713 let result = StreamId::try_new("account-?");
714 assert!(
715 result.is_err(),
716 "StreamId should reject question mark glob metacharacter"
717 );
718 }
719
720 #[test]
721 fn stream_id_rejects_open_bracket_metacharacter() {
722 let result = StreamId::try_new("account-[");
723 assert!(
724 result.is_err(),
725 "StreamId should reject open bracket glob metacharacter"
726 );
727 }
728
729 #[test]
730 fn stream_id_rejects_close_bracket_metacharacter() {
731 let result = StreamId::try_new("account-]");
732 assert!(
733 result.is_err(),
734 "StreamId should reject close bracket glob metacharacter"
735 );
736 }
737
738 #[tokio::test]
739 async fn event_reader_after_position_excludes_event_at_position() {
740 let store = InMemoryEventStore::new();
742 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
743
744 let event1 = TestEvent {
745 stream_id: stream_id.clone(),
746 data: "first".to_string(),
747 };
748 let event2 = TestEvent {
749 stream_id: stream_id.clone(),
750 data: "second".to_string(),
751 };
752 let event3 = TestEvent {
753 stream_id: stream_id.clone(),
754 data: "third".to_string(),
755 };
756
757 let writes = StreamWrites::new()
758 .register_stream(stream_id.clone(), StreamVersion::new(0))
759 .and_then(|w| w.append(event1))
760 .and_then(|w| w.append(event2))
761 .and_then(|w| w.append(event3))
762 .expect("append should succeed");
763
764 store
765 .append_events(writes)
766 .await
767 .expect("append to succeed");
768
769 let all_events = store
771 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
772 .await
773 .expect("read all events to succeed");
774
775 assert_eq!(all_events.len(), 3, "Should have 3 events total");
776 let (first_event, first_position) = &all_events[0];
777
778 let page = EventPage::after(*first_position, BatchSize::new(100));
780 let filter = EventFilter::all();
781 let events = store
782 .read_events::<TestEvent>(filter, page)
783 .await
784 .expect("read to succeed");
785
786 assert_eq!(events.len(), 2, "Should get 2 events after first position");
788 assert_eq!(
789 events[0].0.data, "second",
790 "First returned event should be 'second'"
791 );
792 assert_eq!(
793 events[1].0.data, "third",
794 "Second returned event should be 'third'"
795 );
796
797 for (event, _pos) in &events {
799 assert_ne!(
800 event.data, first_event.data,
801 "First event should be excluded"
802 );
803 }
804
805 for (_event, pos) in &events {
807 assert!(
808 *pos > *first_position,
809 "Returned position {} should be > first position {}",
810 pos,
811 first_position
812 );
813 }
814 }
815
816 #[tokio::test]
817 async fn in_memory_event_store_implements_checkpoint_store() {
818 let store = InMemoryEventStore::new();
820
821 let position = StreamPosition::new(Uuid::now_v7());
823 CheckpointStore::save(&store, "test-projector", position)
824 .await
825 .expect("save should succeed");
826
827 let loaded = CheckpointStore::load(&store, "test-projector")
829 .await
830 .expect("load should succeed");
831 assert_eq!(loaded, Some(position));
832 }
833
834 #[tokio::test]
835 async fn in_memory_event_store_implements_projector_coordinator() {
836 let store = InMemoryEventStore::new();
838
839 let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
841
842 assert!(guard.is_ok(), "should acquire leadership");
844 }
845}