event_sourcing/adapter/in_memory/
in_memory_adapter.rs

1use crate::adapter::notification_adapter::ListenForEventData;
2use crate::adapter::{EventStoreAdapter, NotificationAdapter};
3use crate::error::AdapterError;
4use crate::{Aggregate, Event};
5use alloc::boxed::Box;
6use alloc::collections::BTreeMap;
7use alloc::string::{String, ToString};
8use alloc::sync::Arc;
9use alloc::vec;
10use alloc::vec::Vec;
11use async_trait::async_trait;
12use core::fmt;
13use futures::stream::{iter, BoxStream};
14use futures::StreamExt;
15use tokio::sync::broadcast::Sender;
16use tokio::sync::{broadcast, Mutex};
17use tokio_stream::wrappers::BroadcastStream;
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
21pub struct InMemoryAdapter<A, E> {
22    store: Arc<Mutex<BTreeMap<Uuid, Vec<Event<E>>>>>,
23    snapshots: Arc<Mutex<BTreeMap<Uuid, A>>>,
24    external_ids: Arc<Mutex<BTreeMap<String, Uuid>>>,
25    sender: Sender<ListenForEventData<A, E>>,
26}
27
28impl<A: Send + Clone, E: Clone> InMemoryAdapter<A, E> {
29    pub fn new() -> Self {
30        let (sender, _) = broadcast::channel(16);
31        Self {
32            store: Arc::new(Mutex::new(BTreeMap::new())),
33            snapshots: Arc::new(Mutex::new(BTreeMap::new())),
34            external_ids: Arc::new(Mutex::new(BTreeMap::new())),
35            sender,
36        }
37    }
38}
39
40#[async_trait]
41impl<A: Aggregate<E> + fmt::Debug + Send + Clone + Sync, E: Clone + fmt::Debug + Send + Sync> EventStoreAdapter<A, E>
42    for InMemoryAdapter<A, E>
43{
44    async fn get_events(&self, aggregate_id: Uuid, from: Option<u64>) -> Result<BoxStream<Result<Event<E>, AdapterError>>, AdapterError> {
45        let lock = self.store.lock().await;
46        Ok(iter(lock.get(&aggregate_id).cloned().unwrap_or_default().into_iter().filter(move |e| {
47            if let Some(from) = from {
48                e.event_id > from
49            } else {
50                true
51            }
52        }).map(Ok)).boxed())
53    }
54
55    async fn stream_ids(&self) -> Result<BoxStream<Uuid>, AdapterError> {
56        let lock = self.store.lock().await;
57        let keys = lock.clone().into_keys();
58        Ok(iter(keys).boxed())
59    }
60
61    async fn aggregate_id_from_external_id(
62        &self,
63        external_id: &str,
64    ) -> Result<Option<Uuid>, AdapterError> {
65        let lock = self.external_ids.lock().await;
66        Ok(lock.get(external_id).cloned())
67    }
68
69    async fn save_aggregate_id_to_external_ids(
70        &self,
71        aggregate_id: Uuid,
72        external_ids: &[String],
73    ) -> Result<(), AdapterError> {
74        let mut lock = self.external_ids.lock().await;
75
76        for id in external_ids {
77            lock.insert(id.to_string(), aggregate_id);
78        }
79
80        Ok(())
81    }
82
83    async fn save_events(&self, events: &[Event<E>]) -> Result<(), AdapterError> {
84        for event in events {
85            let mut lock = self.store.lock().await;
86
87            match lock.get_mut(&event.aggregate_id) {
88                None => {
89                    lock.insert(event.aggregate_id, vec![event.clone()]);
90                }
91                Some(list) => {
92                    list.push(event.clone());
93                }
94            }
95        }
96        Ok(())
97    }
98
99    async fn remove(&self, aggregate_id: Uuid) -> Result<(), AdapterError> {
100        let mut lock = self.store.lock().await;
101        lock.remove(&aggregate_id);
102        Ok(())
103    }
104
105    async fn get_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
106        let lock = self.snapshots.lock().await;
107        Ok(lock.get(&aggregate_id).cloned())
108    }
109
110    async fn save_snapshot(&self, aggregate: &A) -> Result<(), AdapterError> {
111        let mut lock = self.snapshots.lock().await;
112        lock.insert(aggregate.aggregate_id(), aggregate.clone());
113        Ok(())
114    }
115}
116
117#[async_trait]
118impl<
119        A: fmt::Debug + Send + Sync + Clone + 'static,
120        E: Clone + fmt::Debug + Send + Sync + 'static,
121    > NotificationAdapter<A, E> for InMemoryAdapter<A, E>
122{
123    async fn send_event(
124        &self,
125        event: &Event<E>,
126        new_aggregate: &A,
127        old_aggregate: Option<&A>,
128    ) -> Result<(), AdapterError> {
129        let _ = self.sender.send(ListenForEventData {
130            event: event.clone(),
131            new_aggregate: new_aggregate.clone(),
132            old_aggregate: old_aggregate.cloned(),
133        });
134        Ok(())
135    }
136
137    async fn listen_for_events(
138        &self,
139    ) -> Result<BoxStream<Result<ListenForEventData<A, E>, AdapterError>>, AdapterError> {
140        let receiver = self.sender.subscribe();
141
142        let stream = BroadcastStream::new(receiver)
143            .map(|i| {
144                i.map_err(|e| AdapterError::Other {
145                    error: e.to_string(),
146                })
147            })
148            .boxed();
149
150        Ok(stream)
151    }
152}