1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12 StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27 event_id: Uuid,
29 stream_id: String,
31 event_type: String,
33 event_data: String,
35}
36
37struct StoreData {
39 streams: HashMap<StreamId, StreamData>,
40 global_log: Vec<GlobalLogEntry>,
42 checkpoints: HashMap<String, StreamPosition>,
44 locks: Arc<RwLock<HashMap<String, ()>>>,
46}
47
48pub struct InMemoryEventStore {
69 data: std::sync::Mutex<StoreData>,
70}
71
72impl InMemoryEventStore {
73 pub fn new() -> Self {
78 Self {
79 data: std::sync::Mutex::new(StoreData {
80 streams: HashMap::new(),
81 global_log: Vec::new(),
82 checkpoints: HashMap::new(),
83 locks: Arc::new(RwLock::new(HashMap::new())),
84 }),
85 }
86 }
87}
88
89impl Default for InMemoryEventStore {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl EventStore for InMemoryEventStore {
96 async fn read_stream<E: Event>(
97 &self,
98 stream_id: StreamId,
99 ) -> Result<EventStream<E>, EventStoreError> {
100 let items: Vec<Result<E, EventStoreError>> = {
108 let data = self
109 .data
110 .lock()
111 .map_err(|_| EventStoreError::StoreFailure {
112 operation: Operation::ReadStream,
113 })?;
114 match data.streams.get(&stream_id) {
115 None => Vec::new(),
116 Some((boxed_events, _version)) => boxed_events
117 .iter()
118 .map(|boxed| match boxed.downcast_ref::<E>() {
119 Some(event) => Ok(event.clone()),
120 None => Err(EventStoreError::DeserializationFailed {
121 stream_id: stream_id.clone(),
122 detail: format!(
123 "event could not be downcast to {}",
124 std::any::type_name::<E>()
125 ),
126 }),
127 })
128 .collect(),
129 }
130 };
131
132 Ok(EventStream::new(futures::stream::iter(items)))
133 }
134
135 async fn append_events(
136 &self,
137 writes: StreamWrites,
138 ) -> Result<EventStreamSlice, EventStoreError> {
139 let mut data = self
140 .data
141 .lock()
142 .map_err(|_| EventStoreError::StoreFailure {
143 operation: Operation::AppendEvents,
144 })?;
145 let expected_versions = writes.expected_versions().clone();
146
147 for (stream_id, expected_version) in &expected_versions {
149 let current_version = data
150 .streams
151 .get(stream_id)
152 .map(|(_events, version)| *version)
153 .unwrap_or_else(|| StreamVersion::new(0));
154
155 if current_version != *expected_version {
156 return Err(EventStoreError::VersionConflict {
157 stream_id: stream_id.clone(),
158 expected: *expected_version,
159 actual: current_version,
160 });
161 }
162 }
163
164 for entry in writes.into_entries() {
166 let StreamWriteEntry {
167 stream_id,
168 event,
169 event_type,
170 event_data,
171 } = entry;
172
173 let event_id = Uuid::now_v7();
175
176 data.global_log.push(GlobalLogEntry {
179 event_id,
180 stream_id: stream_id.as_ref().to_string(),
181 event_type: event_type.to_string(),
182 event_data: event_data.get().to_owned(),
183 });
184
185 let (events, version) = data
186 .streams
187 .entry(stream_id)
188 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
189 events.push(event);
190 *version = version.increment();
191 }
192
193 Ok(EventStreamSlice)
194 }
195}
196
197impl EventReader for InMemoryEventStore {
198 type Error = EventStoreError;
199
200 async fn read_events<E: Event>(
201 &self,
202 filter: EventFilter,
203 page: EventPage,
204 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
205 let data = self
206 .data
207 .lock()
208 .map_err(|_| EventStoreError::StoreFailure {
209 operation: Operation::ReadStream,
210 })?;
211
212 let after_event_id = page.after_position().map(|p| p.into_inner());
213
214 let events: Vec<(E, StreamPosition)> = data
215 .global_log
216 .iter()
217 .filter(|entry| {
218 match after_event_id {
220 None => true,
221 Some(after_id) => entry.event_id > after_id,
222 }
223 })
224 .filter(|entry| {
225 match filter.stream_prefix() {
227 None => true,
228 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
229 }
230 })
231 .filter(|entry| {
232 match filter.stream_pattern() {
235 None => true,
236 Some(pattern) => pattern.matches(&entry.stream_id),
237 }
238 })
239 .filter(|entry| {
240 let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
244 entry.event_type == type_filter
245 })
246 .take(page.limit().into_inner())
247 .filter_map(|entry| {
248 serde_json::from_str::<E>(&entry.event_data)
249 .ok()
250 .map(|e| (e, StreamPosition::new(entry.event_id)))
251 })
252 .collect();
253
254 Ok(events)
255 }
256}
257
258impl CheckpointStore for InMemoryEventStore {
259 type Error = InMemoryCheckpointError;
260
261 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
262 let data = self
263 .data
264 .lock()
265 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
266 Ok(data.checkpoints.get(name).copied())
267 }
268
269 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
270 let mut data = self
271 .data
272 .lock()
273 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
274 let _ = data.checkpoints.insert(name.to_string(), position);
275 Ok(())
276 }
277}
278
279impl ProjectorCoordinator for InMemoryEventStore {
280 type Error = InMemoryCoordinationError;
281 type Guard = InMemoryCoordinationGuard;
282
283 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
284 let data = self
285 .data
286 .lock()
287 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
288 message: e.to_string(),
289 })?;
290
291 let mut guard =
292 data.locks
293 .write()
294 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
295 message: e.to_string(),
296 })?;
297
298 if guard.contains_key(subscription_name) {
299 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
300 subscription_name: subscription_name.to_string(),
301 });
302 }
303
304 let _ = guard.insert(subscription_name.to_string(), ());
305
306 Ok(InMemoryCoordinationGuard {
307 subscription_name: subscription_name.to_string(),
308 locks: Arc::clone(&data.locks),
309 })
310 }
311}
312
313#[derive(Debug, Clone, Default)]
331pub struct InMemoryCheckpointStore {
332 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
333}
334
335impl InMemoryCheckpointStore {
336 pub fn new() -> Self {
338 Self::default()
339 }
340}
341
342#[derive(Debug, Clone, thiserror::Error)]
347pub enum InMemoryCheckpointError {
348 #[error("failed to acquire lock: {0}")]
349 LockFailed(String),
350}
351
352#[derive(Debug, Clone, thiserror::Error)]
354pub enum InMemoryCoordinationError {
355 #[error(
357 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
358 )]
359 LeadershipNotAcquired { subscription_name: String },
360 #[error("lock poisoned: {message}")]
362 LockPoisoned { message: String },
363}
364
365#[derive(Debug)]
367pub struct InMemoryCoordinationGuard {
368 subscription_name: String,
369 locks: Arc<RwLock<HashMap<String, ()>>>,
370}
371
372impl Drop for InMemoryCoordinationGuard {
373 fn drop(&mut self) {
374 if let Ok(mut guard) = self.locks.write() {
375 let _ = guard.remove(&self.subscription_name);
376 } else {
377 tracing::error!(
378 subscription_name = %self.subscription_name,
379 "failed to release coordination lock: RwLock poisoned"
380 );
381 }
382 }
383}
384
385#[derive(Debug, Clone, Default)]
394pub struct InMemoryProjectorCoordinator {
395 locks: Arc<RwLock<HashMap<String, ()>>>,
396}
397
398impl InMemoryProjectorCoordinator {
399 pub fn new() -> Self {
401 Self::default()
402 }
403}
404
405impl ProjectorCoordinator for InMemoryProjectorCoordinator {
406 type Error = InMemoryCoordinationError;
407 type Guard = InMemoryCoordinationGuard;
408
409 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
410 let mut guard =
411 self.locks
412 .write()
413 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
414 message: e.to_string(),
415 })?;
416
417 if guard.contains_key(subscription_name) {
418 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
419 subscription_name: subscription_name.to_string(),
420 });
421 }
422
423 let _ = guard.insert(subscription_name.to_string(), ());
424
425 Ok(InMemoryCoordinationGuard {
426 subscription_name: subscription_name.to_string(),
427 locks: Arc::clone(&self.locks),
428 })
429 }
430}
431
432impl CheckpointStore for InMemoryCheckpointStore {
433 type Error = InMemoryCheckpointError;
434
435 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
436 let guard = self
437 .checkpoints
438 .read()
439 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
440 Ok(guard.get(name).copied())
441 }
442
443 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
444 let mut guard = self
445 .checkpoints
446 .write()
447 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
448 let _ = guard.insert(name.to_string(), position);
449 Ok(())
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use eventcore_types::collect_events;
457 use eventcore_types::{BatchSize, EventFilter, EventPage};
458 use serde::{Deserialize, Serialize};
459
460 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
462 struct TestEvent {
463 stream_id: StreamId,
464 data: String,
465 }
466
467 impl Event for TestEvent {
468 fn stream_id(&self) -> &StreamId {
469 &self.stream_id
470 }
471
472 fn event_type_name() -> &'static str {
473 "TestEvent"
474 }
475 }
476
477 #[tokio::test]
488 async fn test_append_and_read_single_event() {
489 let store = InMemoryEventStore::new();
491
492 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
494
495 let event = TestEvent {
497 stream_id: stream_id.clone(),
498 data: "test event data".to_string(),
499 };
500
501 let writes = StreamWrites::new()
503 .register_stream(stream_id.clone(), StreamVersion::new(0))
504 .and_then(|writes| writes.append(event.clone()))
505 .expect("append should succeed");
506
507 let _ = store
509 .append_events(writes)
510 .await
511 .expect("append to succeed");
512
513 let stream = store
514 .read_stream::<TestEvent>(stream_id)
515 .await
516 .expect("read to succeed");
517 let events = collect_events(stream).await.expect("collect to succeed");
518
519 let observed = (events.is_empty(), events.len());
520
521 assert_eq!(observed, (false, 1usize));
522 }
523
524 #[tokio::test]
525 async fn event_stream_reader_is_empty_reflects_stream_population() {
526 let store = InMemoryEventStore::new();
527 let stream_id =
528 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
529
530 let initial_stream = store
531 .read_stream::<TestEvent>(stream_id.clone())
532 .await
533 .expect("initial read to succeed");
534 let initial_events = collect_events(initial_stream)
535 .await
536 .expect("collect to succeed");
537
538 let event = TestEvent {
539 stream_id: stream_id.clone(),
540 data: "populated event".to_string(),
541 };
542
543 let writes = StreamWrites::new()
544 .register_stream(stream_id.clone(), StreamVersion::new(0))
545 .and_then(|writes| writes.append(event))
546 .expect("append should succeed");
547
548 let _ = store
549 .append_events(writes)
550 .await
551 .expect("append to succeed");
552
553 let populated_stream = store
554 .read_stream::<TestEvent>(stream_id)
555 .await
556 .expect("populated read to succeed");
557 let populated_events = collect_events(populated_stream)
558 .await
559 .expect("collect to succeed");
560
561 let observed = (
562 initial_events.is_empty(),
563 initial_events.len(),
564 populated_events.is_empty(),
565 populated_events.len(),
566 );
567
568 assert_eq!(observed, (true, 0usize, false, 1usize));
569 }
570
571 #[tokio::test]
572 async fn read_stream_iterates_through_events_in_order() {
573 let store = InMemoryEventStore::new();
574 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
575
576 let first_event = TestEvent {
577 stream_id: stream_id.clone(),
578 data: "first".to_string(),
579 };
580
581 let second_event = TestEvent {
582 stream_id: stream_id.clone(),
583 data: "second".to_string(),
584 };
585
586 let writes = StreamWrites::new()
587 .register_stream(stream_id.clone(), StreamVersion::new(0))
588 .and_then(|writes| writes.append(first_event))
589 .and_then(|writes| writes.append(second_event))
590 .expect("append chain should succeed");
591
592 let _ = store
593 .append_events(writes)
594 .await
595 .expect("append to succeed");
596
597 let stream = store
598 .read_stream::<TestEvent>(stream_id)
599 .await
600 .expect("read to succeed");
601 let events = collect_events(stream).await.expect("collect to succeed");
602
603 let collected: Vec<String> = events.iter().map(|event| event.data.clone()).collect();
604
605 let observed = (events.is_empty(), collected);
606
607 assert_eq!(
608 observed,
609 (false, vec!["first".to_string(), "second".to_string()])
610 );
611 }
612
613 #[test]
614 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
615 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
616 .expect("valid stream id");
617
618 let first_event = TestEvent {
619 stream_id: stream_id.clone(),
620 data: "first-event".to_string(),
621 };
622
623 let second_event = TestEvent {
624 stream_id: stream_id.clone(),
625 data: "second-event".to_string(),
626 };
627
628 let writes_result = StreamWrites::new()
629 .register_stream(stream_id.clone(), StreamVersion::new(0))
630 .and_then(|writes| writes.append(first_event))
631 .and_then(|writes| writes.append(second_event));
632
633 assert!(writes_result.is_ok());
634 }
635
636 #[test]
637 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
638 let stream_id =
639 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
640
641 let first_event = TestEvent {
642 stream_id: stream_id.clone(),
643 data: "first-event-conflict".to_string(),
644 };
645
646 let second_event = TestEvent {
647 stream_id: stream_id.clone(),
648 data: "second-event-conflict".to_string(),
649 };
650
651 let conflict = StreamWrites::new()
652 .register_stream(stream_id.clone(), StreamVersion::new(0))
653 .and_then(|writes| writes.append(first_event))
654 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
655 .and_then(|writes| writes.append(second_event));
656
657 let message = conflict.unwrap_err().to_string();
658
659 assert_eq!(
660 message,
661 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
662 );
663 }
664
665 #[tokio::test]
666 async fn stream_writes_registers_stream_before_appending_multiple_events() {
667 let store = InMemoryEventStore::new();
668 let stream_id =
669 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
670
671 let first_event = TestEvent {
672 stream_id: stream_id.clone(),
673 data: "first-registered-event".to_string(),
674 };
675
676 let second_event = TestEvent {
677 stream_id: stream_id.clone(),
678 data: "second-registered-event".to_string(),
679 };
680
681 let writes = StreamWrites::new()
682 .register_stream(stream_id.clone(), StreamVersion::new(0))
683 .and_then(|writes| writes.append(first_event))
684 .and_then(|writes| writes.append(second_event))
685 .expect("registered stream should accept events");
686
687 let result = store.append_events(writes).await;
688
689 assert!(
690 result.is_ok(),
691 "append should succeed when stream registered before events"
692 );
693 }
694
695 #[test]
696 fn stream_writes_rejects_appends_for_unregistered_streams() {
697 let stream_id =
698 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
699
700 let event = TestEvent {
701 stream_id: stream_id.clone(),
702 data: "unregistered-event".to_string(),
703 };
704
705 let error = StreamWrites::new()
706 .append(event)
707 .expect_err("append without prior registration should fail");
708
709 assert!(matches!(
710 error,
711 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
712 ));
713 }
714
715 #[test]
716 fn expected_versions_returns_registered_streams_and_versions() {
717 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
718 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
719
720 let writes = StreamWrites::new()
721 .register_stream(stream_a.clone(), StreamVersion::new(0))
722 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
723 .expect("registration should succeed");
724
725 let versions = writes.expected_versions();
726
727 assert_eq!(versions.len(), 2);
728 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
729 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
730 }
731
732 #[test]
733 fn stream_id_rejects_asterisk_metacharacter() {
734 let result = StreamId::try_new("account-*");
735 assert!(
736 result.is_err(),
737 "StreamId should reject asterisk glob metacharacter"
738 );
739 }
740
741 #[test]
742 fn stream_id_rejects_question_mark_metacharacter() {
743 let result = StreamId::try_new("account-?");
744 assert!(
745 result.is_err(),
746 "StreamId should reject question mark glob metacharacter"
747 );
748 }
749
750 #[test]
751 fn stream_id_rejects_open_bracket_metacharacter() {
752 let result = StreamId::try_new("account-[");
753 assert!(
754 result.is_err(),
755 "StreamId should reject open bracket glob metacharacter"
756 );
757 }
758
759 #[test]
760 fn stream_id_rejects_close_bracket_metacharacter() {
761 let result = StreamId::try_new("account-]");
762 assert!(
763 result.is_err(),
764 "StreamId should reject close bracket glob metacharacter"
765 );
766 }
767
768 #[tokio::test]
769 async fn event_reader_after_position_excludes_event_at_position() {
770 let store = InMemoryEventStore::new();
772 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
773
774 let event1 = TestEvent {
775 stream_id: stream_id.clone(),
776 data: "first".to_string(),
777 };
778 let event2 = TestEvent {
779 stream_id: stream_id.clone(),
780 data: "second".to_string(),
781 };
782 let event3 = TestEvent {
783 stream_id: stream_id.clone(),
784 data: "third".to_string(),
785 };
786
787 let writes = StreamWrites::new()
788 .register_stream(stream_id.clone(), StreamVersion::new(0))
789 .and_then(|w| w.append(event1))
790 .and_then(|w| w.append(event2))
791 .and_then(|w| w.append(event3))
792 .expect("append should succeed");
793
794 let _ = store
795 .append_events(writes)
796 .await
797 .expect("append to succeed");
798
799 let all_events = store
801 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
802 .await
803 .expect("read all events to succeed");
804
805 assert_eq!(all_events.len(), 3, "Should have 3 events total");
806 let (first_event, first_position) = &all_events[0];
807
808 let page = EventPage::after(*first_position, BatchSize::new(100));
810 let filter = EventFilter::all();
811 let events = store
812 .read_events::<TestEvent>(filter, page)
813 .await
814 .expect("read to succeed");
815
816 assert_eq!(events.len(), 2, "Should get 2 events after first position");
818 assert_eq!(
819 events[0].0.data, "second",
820 "First returned event should be 'second'"
821 );
822 assert_eq!(
823 events[1].0.data, "third",
824 "Second returned event should be 'third'"
825 );
826
827 for (event, _pos) in &events {
829 assert_ne!(
830 event.data, first_event.data,
831 "First event should be excluded"
832 );
833 }
834
835 for (_event, pos) in &events {
837 assert!(
838 *pos > *first_position,
839 "Returned position {} should be > first position {}",
840 pos,
841 first_position
842 );
843 }
844 }
845
846 #[tokio::test]
847 async fn in_memory_event_store_implements_checkpoint_store() {
848 let store = InMemoryEventStore::new();
850
851 let position = StreamPosition::new(Uuid::now_v7());
853 CheckpointStore::save(&store, "test-projector", position)
854 .await
855 .expect("save should succeed");
856
857 let loaded = CheckpointStore::load(&store, "test-projector")
859 .await
860 .expect("load should succeed");
861 assert_eq!(loaded, Some(position));
862 }
863
864 #[tokio::test]
865 async fn in_memory_event_store_implements_projector_coordinator() {
866 let store = InMemoryEventStore::new();
868
869 let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
871
872 assert!(guard.is_ok(), "should acquire leadership");
874 }
875}