thalo_inmemory/
event_store.rs

1use std::sync::RwLock;
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use thalo::{
7    aggregate::{Aggregate, TypeId},
8    event::AggregateEventEnvelope,
9    event_store::EventStore,
10};
11
12use crate::Error;
13
14/// An in memory event store.
15///
16/// This is useful for testing, but is not recommended
17/// for production as the data does not persist to disk.
18///
19/// See [crate] documentation for more info.
20#[derive(Debug, Default)]
21pub struct InMemoryEventStore {
22    /// Raw events stored in memory.
23    pub events: RwLock<Vec<EventRecord>>,
24}
25
26/// An event record.
27#[derive(Debug, Deserialize, Serialize)]
28pub struct EventRecord {
29    created_at: DateTime<Utc>,
30    aggregate_type: String,
31    aggregate_id: String,
32    sequence: usize,
33    event_data: serde_json::Value,
34}
35
36impl EventRecord {
37    fn event_envelope<A>(&self, id: usize) -> Result<AggregateEventEnvelope<A>, Error>
38    where
39        A: Aggregate,
40        <A as Aggregate>::Event: DeserializeOwned,
41    {
42        Ok(AggregateEventEnvelope::<A> {
43            id,
44            created_at: self.created_at.into(),
45            aggregate_type: self.aggregate_type.to_string(),
46            aggregate_id: self.aggregate_id.clone(),
47            sequence: self.sequence,
48            event: serde_json::from_value(self.event_data.clone())
49                .map_err(Error::DeserializeEvent)?,
50        })
51    }
52}
53
54#[async_trait]
55impl EventStore for InMemoryEventStore {
56    type Error = Error;
57
58    async fn load_events<A>(
59        &self,
60        id: Option<&<A as Aggregate>::ID>,
61    ) -> Result<Vec<AggregateEventEnvelope<A>>, Self::Error>
62    where
63        A: Aggregate,
64        <A as Aggregate>::Event: DeserializeOwned,
65    {
66        let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
67
68        let events = events_lock
69            .iter()
70            .enumerate()
71            .filter(|(_index, event)| {
72                event.aggregate_type == <A as TypeId>::type_id()
73                    && id
74                        .map(|id| event.aggregate_id == id.to_string())
75                        .unwrap_or(true)
76            })
77            .map(|(index, event)| event.event_envelope::<A>(index))
78            .collect::<Result<_, _>>()?;
79
80        Ok(events)
81    }
82
83    async fn load_events_by_id<A>(
84        &self,
85        ids: &[usize],
86    ) -> Result<Vec<AggregateEventEnvelope<A>>, Self::Error>
87    where
88        A: Aggregate,
89        <A as Aggregate>::Event: DeserializeOwned,
90    {
91        let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
92
93        ids.iter()
94            .filter_map(|id| {
95                events_lock
96                    .get(*id)
97                    .map(|event| event.event_envelope::<A>(*id))
98            })
99            .collect::<Result<_, _>>()
100    }
101
102    async fn load_aggregate_sequence<A>(
103        &self,
104        id: &<A as Aggregate>::ID,
105    ) -> Result<Option<usize>, Self::Error>
106    where
107        A: Aggregate,
108    {
109        let events_lock = self.events.read().map_err(|_| Error::RwPoison)?;
110
111        Ok(events_lock
112            .iter()
113            .filter_map(|event| {
114                if event.aggregate_type == <A as TypeId>::type_id()
115                    && event.aggregate_id == id.to_string()
116                {
117                    Some(event.sequence)
118                } else {
119                    None
120                }
121            })
122            .max())
123    }
124
125    async fn save_events<A>(
126        &self,
127        id: &<A as Aggregate>::ID,
128        events: &[<A as Aggregate>::Event],
129    ) -> Result<Vec<usize>, Self::Error>
130    where
131        A: Aggregate,
132        <A as Aggregate>::Event: Serialize,
133    {
134        if events.is_empty() {
135            return Ok(vec![]);
136        }
137
138        let sequence = self.load_aggregate_sequence::<A>(id).await?;
139        let mut event_ids = Vec::with_capacity(events.len());
140
141        let mut events_lock = self.events.write().map_err(|_| Error::RwPoison)?;
142
143        for (index, event) in events.iter().enumerate() {
144            let created_at = Utc::now();
145            let aggregate_type = <A as TypeId>::type_id().to_string();
146            let aggregate_id = id.to_string();
147            let sequence = sequence.map(|sequence| sequence + index + 1).unwrap_or(0);
148            let event_data = serde_json::to_value(event).map_err(Error::SerializeEvent)?;
149
150            events_lock.push(EventRecord {
151                created_at,
152                aggregate_type,
153                aggregate_id,
154                sequence,
155                event_data,
156            });
157            event_ids.push(events_lock.len() - 1);
158        }
159
160        Ok(event_ids)
161    }
162}
163
164#[cfg(feature = "debug")]
165impl InMemoryEventStore {
166    /// Print the event store as a table to stdout.
167    pub fn print(&self) {
168        let events_lock = self.events.read().unwrap();
169
170        let mut table = prettytable::Table::new();
171        table.set_titles(
172            [
173                "ID",
174                "Created At",
175                "Aggregate Type",
176                "Aggregate ID",
177                "Sequence",
178                "Event Data",
179            ]
180            .into(),
181        );
182
183        if events_lock.is_empty() {
184            table.add_row(["", "", "", "", "", ""].into());
185        } else {
186            for (id, event) in events_lock.iter().enumerate() {
187                table.add_row(
188                    [
189                        id.to_string(),
190                        event.created_at.to_string(),
191                        event.aggregate_type.to_string(),
192                        event.aggregate_id.clone(),
193                        event.sequence.to_string(),
194                        serde_json::to_string(&event.event_data).unwrap(),
195                    ]
196                    .into(),
197                );
198            }
199        }
200
201        table.printstd();
202    }
203}