cqrs_rust_lib/es/
inmemory.rs1use crate::es::storage::{EventStoreStorage, EventStream};
2use crate::{Aggregate, CqrsError, EventEnvelope, Snapshot};
3use futures::lock::{Mutex, OwnedMutexGuard};
4use futures::stream;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8#[derive(Clone, Debug, Default)]
9pub struct InMemoryPersist<A>
10where
11 A: Aggregate,
12{
13 _phantom: std::marker::PhantomData<A>,
14 snapshot: Arc<Mutex<HashMap<String, Snapshot<A>>>>,
15 journal: Arc<Mutex<HashMap<String, Vec<EventEnvelope<A>>>>>,
16}
17
18impl<A> InMemoryPersist<A>
19where
20 A: Aggregate,
21{
22 #[must_use]
23 pub fn new() -> Self {
24 Self::default()
25 }
26}
27
28cqrs_async_trait! {
29impl<A> EventStoreStorage<A> for InMemoryPersist<A>
30where
31 A: Aggregate + 'static,
32{
33 type Session = (
34 OwnedMutexGuard<HashMap<String, Snapshot<A>>>,
35 OwnedMutexGuard<HashMap<String, Vec<EventEnvelope<A>>>>,
36 );
37
38 async fn start_session(&self) -> Result<Self::Session, CqrsError> {
39 let journal = self.journal.clone().lock_owned().await;
40 let snapshot = self.snapshot.clone().lock_owned().await;
41 Ok((snapshot, journal))
42 }
43
44 async fn close_session(&self, _session: Self::Session) -> Result<(), CqrsError> {
45 Ok(())
46 }
47
48 async fn fetch_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError> {
49 let snapshot = self.snapshot.lock().await;
50 Ok(snapshot.get(aggregate_id).cloned())
51 }
52
53 async fn fetch_events_from_version(
54 &self,
55 aggregate_id: &str,
56 version: usize,
57 ) -> Result<EventStream<A>, CqrsError> {
58 let journal = self.journal.lock().await;
59 let items = journal.get(aggregate_id).cloned().unwrap_or_default();
60 let events: Vec<EventEnvelope<A>> =
61 items.into_iter().filter(|v| v.version > version).collect();
62 Ok(Box::pin(stream::iter(events.into_iter().map(Ok))))
63 }
64
65 async fn fetch_all_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError> {
66 let journal = self.journal.lock().await;
67 let items = journal.get(aggregate_id).cloned().unwrap_or_default();
68 Ok(Box::pin(stream::iter(items.into_iter().map(Ok))))
69 }
70
71 async fn fetch_events_paged(
72 &self,
73 aggregate_id: &str,
74 page: usize,
75 page_size: usize,
76 ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError> {
77 let journal = self.journal.lock().await;
78 let items = journal.get(aggregate_id).cloned().unwrap_or_default();
79 let total = items.len() as i64;
80 let offset = (page.max(1) - 1) * page_size;
81 let events: Vec<EventEnvelope<A>> =
82 items.into_iter().skip(offset).take(page_size).collect();
83 Ok((events, total))
84 }
85
86 async fn fetch_latest_event(
87 &self,
88 aggregate: &A,
89 session: &Self::Session,
90 ) -> Result<Option<EventEnvelope<A>>, CqrsError> {
91 let events = session
92 .1
93 .get(aggregate.aggregate_id().as_str())
94 .cloned()
95 .unwrap_or_default();
96 Ok(events.last().cloned())
97 }
98
99 async fn save_events(
100 &self,
101 events: Vec<EventEnvelope<A>>,
102 session: &mut Self::Session,
103 ) -> Result<(), CqrsError> {
104 if events.is_empty() {
105 return Ok(());
106 }
107 let aggregate_id = events.first().unwrap().aggregate_id.clone();
108 session
109 .1
110 .entry(aggregate_id)
111 .and_modify(|val| {
112 for e in events.iter() {
113 val.push(e.clone());
114 }
115 })
116 .or_insert(events);
117 Ok(())
118 }
119
120 async fn save_snapshot(
121 &self,
122 aggregate: &A,
123 version: usize,
124 session: &mut Self::Session,
125 ) -> Result<(), CqrsError> {
126 session.0.insert(
127 aggregate.aggregate_id(),
128 Snapshot {
129 aggregate_id: aggregate.aggregate_id(),
130 state: aggregate.clone(),
131 version,
132 },
133 );
134 Ok(())
135 }
136}
137}