event_sourcing/adapter/in_memory/
in_memory_adapter.rs1use crate::adapter::notification_adapter::ListenForEventData;
2use crate::adapter::{EventStoreAdapter, NotificationAdapter};
3use crate::error::AdapterError;
4use crate::{Aggregate, Event};
5use alloc::boxed::Box;
6use alloc::collections::BTreeMap;
7use alloc::string::{String, ToString};
8use alloc::sync::Arc;
9use alloc::vec;
10use alloc::vec::Vec;
11use async_trait::async_trait;
12use core::fmt;
13use futures::stream::{iter, BoxStream};
14use futures::StreamExt;
15use tokio::sync::broadcast::Sender;
16use tokio::sync::{broadcast, Mutex};
17use tokio_stream::wrappers::BroadcastStream;
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
21pub struct InMemoryAdapter<A, E> {
22 store: Arc<Mutex<BTreeMap<Uuid, Vec<Event<E>>>>>,
23 snapshots: Arc<Mutex<BTreeMap<Uuid, A>>>,
24 external_ids: Arc<Mutex<BTreeMap<String, Uuid>>>,
25 sender: Sender<ListenForEventData<A, E>>,
26}
27
28impl<A: Send + Clone, E: Clone> InMemoryAdapter<A, E> {
29 pub fn new() -> Self {
30 let (sender, _) = broadcast::channel(16);
31 Self {
32 store: Arc::new(Mutex::new(BTreeMap::new())),
33 snapshots: Arc::new(Mutex::new(BTreeMap::new())),
34 external_ids: Arc::new(Mutex::new(BTreeMap::new())),
35 sender,
36 }
37 }
38}
39
40#[async_trait]
41impl<A: Aggregate<E> + fmt::Debug + Send + Clone + Sync, E: Clone + fmt::Debug + Send + Sync> EventStoreAdapter<A, E>
42 for InMemoryAdapter<A, E>
43{
44 async fn get_events(&self, aggregate_id: Uuid, from: Option<u64>) -> Result<BoxStream<Result<Event<E>, AdapterError>>, AdapterError> {
45 let lock = self.store.lock().await;
46 Ok(iter(lock.get(&aggregate_id).cloned().unwrap_or_default().into_iter().filter(move |e| {
47 if let Some(from) = from {
48 e.event_id > from
49 } else {
50 true
51 }
52 }).map(Ok)).boxed())
53 }
54
55 async fn stream_ids(&self) -> Result<BoxStream<Uuid>, AdapterError> {
56 let lock = self.store.lock().await;
57 let keys = lock.clone().into_keys();
58 Ok(iter(keys).boxed())
59 }
60
61 async fn aggregate_id_from_external_id(
62 &self,
63 external_id: &str,
64 ) -> Result<Option<Uuid>, AdapterError> {
65 let lock = self.external_ids.lock().await;
66 Ok(lock.get(external_id).cloned())
67 }
68
69 async fn save_aggregate_id_to_external_ids(
70 &self,
71 aggregate_id: Uuid,
72 external_ids: &[String],
73 ) -> Result<(), AdapterError> {
74 let mut lock = self.external_ids.lock().await;
75
76 for id in external_ids {
77 lock.insert(id.to_string(), aggregate_id);
78 }
79
80 Ok(())
81 }
82
83 async fn save_events(&self, events: &[Event<E>]) -> Result<(), AdapterError> {
84 for event in events {
85 let mut lock = self.store.lock().await;
86
87 match lock.get_mut(&event.aggregate_id) {
88 None => {
89 lock.insert(event.aggregate_id, vec![event.clone()]);
90 }
91 Some(list) => {
92 list.push(event.clone());
93 }
94 }
95 }
96 Ok(())
97 }
98
99 async fn remove(&self, aggregate_id: Uuid) -> Result<(), AdapterError> {
100 let mut lock = self.store.lock().await;
101 lock.remove(&aggregate_id);
102 Ok(())
103 }
104
105 async fn get_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
106 let lock = self.snapshots.lock().await;
107 Ok(lock.get(&aggregate_id).cloned())
108 }
109
110 async fn save_snapshot(&self, aggregate: &A) -> Result<(), AdapterError> {
111 let mut lock = self.snapshots.lock().await;
112 lock.insert(aggregate.aggregate_id(), aggregate.clone());
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl<
119 A: fmt::Debug + Send + Sync + Clone + 'static,
120 E: Clone + fmt::Debug + Send + Sync + 'static,
121 > NotificationAdapter<A, E> for InMemoryAdapter<A, E>
122{
123 async fn send_event(
124 &self,
125 event: &Event<E>,
126 new_aggregate: &A,
127 old_aggregate: Option<&A>,
128 ) -> Result<(), AdapterError> {
129 let _ = self.sender.send(ListenForEventData {
130 event: event.clone(),
131 new_aggregate: new_aggregate.clone(),
132 old_aggregate: old_aggregate.cloned(),
133 });
134 Ok(())
135 }
136
137 async fn listen_for_events(
138 &self,
139 ) -> Result<BoxStream<Result<ListenForEventData<A, E>, AdapterError>>, AdapterError> {
140 let receiver = self.sender.subscribe();
141
142 let stream = BroadcastStream::new(receiver)
143 .map(|i| {
144 i.map_err(|e| AdapterError::Other {
145 error: e.to_string(),
146 })
147 })
148 .boxed();
149
150 Ok(stream)
151 }
152}