thalo_inmemory/
event_store.rs1use std::sync::RwLock;
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use thalo::{
7 aggregate::{Aggregate, TypeId},
8 event::AggregateEventEnvelope,
9 event_store::EventStore,
10};
11
12use crate::Error;
13
14#[derive(Debug, Default)]
21pub struct InMemoryEventStore {
22 pub events: RwLock<Vec<EventRecord>>,
24}
25
26#[derive(Debug, Deserialize, Serialize)]
28pub struct EventRecord {
29 created_at: DateTime<Utc>,
30 aggregate_type: String,
31 aggregate_id: String,
32 sequence: usize,
33 event_data: serde_json::Value,
34}
35
36impl EventRecord {
37 fn event_envelope<A>(&self, id: usize) -> Result<AggregateEventEnvelope<A>, Error>
38 where
39 A: Aggregate,
40 <A as Aggregate>::Event: DeserializeOwned,
41 {
42 Ok(AggregateEventEnvelope::<A> {
43 id,
44 created_at: self.created_at.into(),
45 aggregate_type: self.aggregate_type.to_string(),
46 aggregate_id: self.aggregate_id.clone(),
47 sequence: self.sequence,
48 event: serde_json::from_value(self.event_data.clone())
49 .map_err(Error::DeserializeEvent)?,
50 })
51 }
52}
53
54#[async_trait]
55impl EventStore for InMemoryEventStore {
56 type Error = Error;
57
58 async fn load_events<A>(
59 &self,
60 id: Option<&<A as Aggregate>::ID>,
61 ) -> Result<Vec<AggregateEventEnvelope<A>>, Self::Error>
62 where
63 A: Aggregate,
64 <A as Aggregate>::Event: DeserializeOwned,
65 {
66 let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
67
68 let events = events_lock
69 .iter()
70 .enumerate()
71 .filter(|(_index, event)| {
72 event.aggregate_type == <A as TypeId>::type_id()
73 && id
74 .map(|id| event.aggregate_id == id.to_string())
75 .unwrap_or(true)
76 })
77 .map(|(index, event)| event.event_envelope::<A>(index))
78 .collect::<Result<_, _>>()?;
79
80 Ok(events)
81 }
82
83 async fn load_events_by_id<A>(
84 &self,
85 ids: &[usize],
86 ) -> Result<Vec<AggregateEventEnvelope<A>>, Self::Error>
87 where
88 A: Aggregate,
89 <A as Aggregate>::Event: DeserializeOwned,
90 {
91 let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
92
93 ids.iter()
94 .filter_map(|id| {
95 events_lock
96 .get(*id)
97 .map(|event| event.event_envelope::<A>(*id))
98 })
99 .collect::<Result<_, _>>()
100 }
101
102 async fn load_aggregate_sequence<A>(
103 &self,
104 id: &<A as Aggregate>::ID,
105 ) -> Result<Option<usize>, Self::Error>
106 where
107 A: Aggregate,
108 {
109 let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
110
111 Ok(events_lock
112 .iter()
113 .filter_map(|event| {
114 if event.aggregate_type == <A as TypeId>::type_id()
115 && event.aggregate_id == id.to_string()
116 {
117 Some(event.sequence)
118 } else {
119 None
120 }
121 })
122 .max())
123 }
124
125 async fn save_events<A>(
126 &self,
127 id: &<A as Aggregate>::ID,
128 events: &[<A as Aggregate>::Event],
129 ) -> Result<Vec<usize>, Self::Error>
130 where
131 A: Aggregate,
132 <A as Aggregate>::Event: Serialize,
133 {
134 if events.is_empty() {
135 return Ok(vec![]);
136 }
137
138 let sequence = self.load_aggregate_sequence::<A>(id).await?;
139 let mut event_ids = Vec::with_capacity(events.len());
140
141 let mut events_lock = self.events.write().map_err(|_| Error::RwPoison)?;
142
143 for (index, event) in events.iter().enumerate() {
144 let created_at = Utc::now();
145 let aggregate_type = <A as TypeId>::type_id().to_string();
146 let aggregate_id = id.to_string();
147 let sequence = sequence.map(|sequence| sequence + index + 1).unwrap_or(0);
148 let event_data = serde_json::to_value(event).map_err(Error::SerializeEvent)?;
149
150 events_lock.push(EventRecord {
151 created_at,
152 aggregate_type,
153 aggregate_id,
154 sequence,
155 event_data,
156 });
157 event_ids.push(events_lock.len() - 1);
158 }
159
160 Ok(event_ids)
161 }
162}
163
164#[cfg(feature = "debug")]
165impl InMemoryEventStore {
166 pub fn print(&self) {
168 let events_lock = self.events.read().unwrap();
169
170 let mut table = prettytable::Table::new();
171 table.set_titles(
172 [
173 "ID",
174 "Created At",
175 "Aggregate Type",
176 "Aggregate ID",
177 "Sequence",
178 "Event Data",
179 ]
180 .into(),
181 );
182
183 if events_lock.is_empty() {
184 table.add_row(["", "", "", "", "", ""].into());
185 } else {
186 for (id, event) in events_lock.iter().enumerate() {
187 table.add_row(
188 [
189 id.to_string(),
190 event.created_at.to_string(),
191 event.aggregate_type.to_string(),
192 event.aggregate_id.clone(),
193 event.sequence.to_string(),
194 serde_json::to_string(&event.event_data).unwrap(),
195 ]
196 .into(),
197 );
198 }
199 }
200
201 table.printstd();
202 }
203}