Skip to main content

cqrs_rust_lib/es/
inmemory.rs

1use crate::es::storage::{EventStoreStorage, EventStream};
2use crate::{Aggregate, CqrsError, EventEnvelope, Snapshot};
3use futures::lock::{Mutex, OwnedMutexGuard};
4use futures::stream;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8#[derive(Clone, Debug, Default)]
9pub struct InMemoryPersist<A>
10where
11    A: Aggregate,
12{
13    _phantom: std::marker::PhantomData<A>,
14    snapshot: Arc<Mutex<HashMap<String, Snapshot<A>>>>,
15    journal: Arc<Mutex<HashMap<String, Vec<EventEnvelope<A>>>>>,
16}
17
18impl<A> InMemoryPersist<A>
19where
20    A: Aggregate,
21{
22    #[must_use]
23    pub fn new() -> Self {
24        Self::default()
25    }
26}
27
28cqrs_async_trait! {
29impl<A> EventStoreStorage<A> for InMemoryPersist<A>
30where
31    A: Aggregate + 'static,
32{
33    type Session = (
34        OwnedMutexGuard<HashMap<String, Snapshot<A>>>,
35        OwnedMutexGuard<HashMap<String, Vec<EventEnvelope<A>>>>,
36    );
37
38    async fn start_session(&self) -> Result<Self::Session, CqrsError> {
39        let journal = self.journal.clone().lock_owned().await;
40        let snapshot = self.snapshot.clone().lock_owned().await;
41        Ok((snapshot, journal))
42    }
43
44    async fn close_session(&self, _session: Self::Session) -> Result<(), CqrsError> {
45        Ok(())
46    }
47
48    async fn fetch_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError> {
49        let snapshot = self.snapshot.lock().await;
50        Ok(snapshot.get(aggregate_id).cloned())
51    }
52
53    async fn fetch_events_from_version(
54        &self,
55        aggregate_id: &str,
56        version: usize,
57    ) -> Result<EventStream<A>, CqrsError> {
58        let journal = self.journal.lock().await;
59        let items = journal.get(aggregate_id).cloned().unwrap_or_default();
60        let events: Vec<EventEnvelope<A>> =
61            items.into_iter().filter(|v| v.version > version).collect();
62        Ok(Box::pin(stream::iter(events.into_iter().map(Ok))))
63    }
64
65    async fn fetch_all_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError> {
66        let journal = self.journal.lock().await;
67        let items = journal.get(aggregate_id).cloned().unwrap_or_default();
68        Ok(Box::pin(stream::iter(items.into_iter().map(Ok))))
69    }
70
71    async fn fetch_events_paged(
72        &self,
73        aggregate_id: &str,
74        page: usize,
75        page_size: usize,
76    ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError> {
77        let journal = self.journal.lock().await;
78        let items = journal.get(aggregate_id).cloned().unwrap_or_default();
79        let total = items.len() as i64;
80        let offset = (page.max(1) - 1) * page_size;
81        let events: Vec<EventEnvelope<A>> =
82            items.into_iter().skip(offset).take(page_size).collect();
83        Ok((events, total))
84    }
85
86    async fn fetch_latest_event(
87        &self,
88        aggregate: &A,
89        session: &Self::Session,
90    ) -> Result<Option<EventEnvelope<A>>, CqrsError> {
91        let events = session
92            .1
93            .get(aggregate.aggregate_id().as_str())
94            .cloned()
95            .unwrap_or_default();
96        Ok(events.last().cloned())
97    }
98
99    async fn save_events(
100        &self,
101        events: Vec<EventEnvelope<A>>,
102        session: &mut Self::Session,
103    ) -> Result<(), CqrsError> {
104        if events.is_empty() {
105            return Ok(());
106        }
107        let aggregate_id = events.first().unwrap().aggregate_id.clone();
108        session
109            .1
110            .entry(aggregate_id)
111            .and_modify(|val| {
112                for e in events.iter() {
113                    val.push(e.clone());
114                }
115            })
116            .or_insert(events);
117        Ok(())
118    }
119
120    async fn save_snapshot(
121        &self,
122        aggregate: &A,
123        version: usize,
124        session: &mut Self::Session,
125    ) -> Result<(), CqrsError> {
126        session.0.insert(
127            aggregate.aggregate_id(),
128            Snapshot {
129                aggregate_id: aggregate.aggregate_id(),
130                state: aggregate.clone(),
131                version,
132            },
133        );
134        Ok(())
135    }
136}
137}