evento_store/
store.rs

1use chrono::{DateTime, Utc};
2use evento_query::{CursorType, QueryResult};
3use serde::{de::DeserializeOwned, Deserialize, Serialize};
4use serde_json::Value;
5use uuid::Uuid;
6
7use crate::{engine::Engine, error::Result, Aggregate, Applier, StoreError};
8
9#[derive(Debug, Serialize, Deserialize)]
10pub struct SnapshotMetadata {
11    pub cursor: Option<CursorType>,
12    pub version: u16,
13    pub snapshot_version: String,
14    pub created_at: DateTime<Utc>,
15}
16
17#[derive(Clone)]
18pub struct Store {
19    pub(crate) engine: Box<dyn Engine>,
20}
21
22impl Store {
23    pub fn new<E: Engine + 'static>(engine: E) -> Self {
24        Self {
25            engine: Box::new(engine),
26        }
27    }
28
29    pub async fn load<A: Aggregate + Applier>(
30        &self,
31        aggregate_id: impl Into<String>,
32    ) -> Result<Option<(A, u16)>> {
33        self.load_with(aggregate_id, 100).await
34    }
35
36    pub async fn load_with<A: Aggregate + Applier>(
37        &self,
38        aggregate_id: impl Into<String>,
39        first: u16,
40    ) -> Result<Option<(A, u16)>> {
41        let aggregate_id = aggregate_id.into();
42        let snapshot_id = format!("snapshot-{aggregate_id}");
43
44        let (mut aggregate, mut cursor, mut version) =
45            match self.first_of::<A>(&snapshot_id).await? {
46                Some(e) => match (e.version, e.to_metadata::<SnapshotMetadata>()?) {
47                    (0, Some(metadata)) => {
48                        if A::aggregate_version() == metadata.snapshot_version {
49                            (e.to_data::<A>()?, metadata.cursor, metadata.version)
50                        } else {
51                            (A::default(), None, 0)
52                        }
53                    }
54                    _ => (A::default(), None, 0),
55                },
56                _ => (A::default(), None, 0),
57            };
58
59        loop {
60            let events = self.read_of::<A>(&aggregate_id, first, cursor).await?;
61
62            if events.edges.is_empty() {
63                return Ok(None);
64            }
65
66            for event in events.edges.iter() {
67                aggregate.apply(&event.node);
68                version = u16::try_from(event.node.version)?;
69            }
70
71            let snapshot = Event {
72                name: "_snapshot".to_owned(),
73                aggregate_id: A::to_aggregate_id(&snapshot_id),
74                data: serde_json::to_value(&aggregate)?,
75                metadata: Some(serde_json::to_value(SnapshotMetadata {
76                    cursor: events.page_info.end_cursor.clone(),
77                    version,
78                    snapshot_version: A::aggregate_version().to_owned(),
79                    created_at: Utc::now(),
80                })?),
81                ..Event::default()
82            };
83
84            self.engine.upsert(snapshot).await?;
85
86            if !events.page_info.has_next_page {
87                break;
88            }
89
90            cursor = events.page_info.end_cursor;
91        }
92
93        Ok(Some((aggregate, version)))
94    }
95
96    pub async fn write<A: Aggregate>(
97        &self,
98        aggregate_id: impl Into<String>,
99        event: WriteEvent,
100        original_version: u16,
101    ) -> Result<Event> {
102        let events = self
103            .write_all::<A>(aggregate_id, vec![event], original_version)
104            .await?;
105
106        match events.first() {
107            Some(event) => Ok(event.clone()),
108            _ => Err(crate::StoreError::EmptyWriteEvent),
109        }
110    }
111
112    pub async fn write_all<A: Aggregate>(
113        &self,
114        aggregate_id: impl Into<String>,
115        events: Vec<WriteEvent>,
116        original_version: u16,
117    ) -> Result<Vec<Event>> {
118        self.engine
119            .write(
120                A::to_aggregate_id(aggregate_id).as_str(),
121                events,
122                original_version,
123            )
124            .await
125    }
126
127    pub async fn insert(&self, events: Vec<Event>) -> Result<()> {
128        self.engine.insert(events).await
129    }
130
131    pub async fn read(
132        &self,
133        first: u16,
134        after: Option<CursorType>,
135        filters: Option<Vec<Value>>,
136    ) -> Result<QueryResult<Event>> {
137        self.engine.read(first, after, filters, None).await
138    }
139
140    pub async fn read_of<A: Aggregate>(
141        &self,
142        aggregate_id: impl Into<String>,
143        first: u16,
144        after: Option<CursorType>,
145    ) -> Result<QueryResult<Event>> {
146        let aggregate_id = A::to_aggregate_id(aggregate_id);
147
148        self.engine
149            .read(first, after, None, Some(aggregate_id.as_str()))
150            .await
151    }
152
153    pub async fn first_of<A: Aggregate>(
154        &self,
155        aggregate_id: impl Into<String>,
156    ) -> Result<Option<Event>> {
157        let events = self.read_of::<A>(aggregate_id, 1, None).await?;
158
159        Ok(events.edges.first().map(|e| e.node.clone()))
160    }
161
162    pub async fn last(&self) -> Result<Option<Event>> {
163        self.engine.last().await
164    }
165}
166
167#[derive(Debug, Clone, Default)]
168pub struct WriteEvent {
169    pub name: String,
170    pub data: Value,
171    pub metadata: Option<Value>,
172}
173
174impl WriteEvent {
175    pub fn new<N: Into<String>>(name: N) -> Self {
176        Self {
177            name: name.into(),
178            ..Self::default()
179        }
180    }
181
182    pub fn to_event(&self, aggregate_id: impl Into<String>, version: u16) -> Event {
183        Event {
184            name: self.name.to_owned(),
185            aggregate_id: aggregate_id.into(),
186            version: i32::from(version),
187            data: self.data.clone(),
188            metadata: self.metadata.clone(),
189            ..Default::default()
190        }
191    }
192
193    pub fn data<D: Serialize>(mut self, value: D) -> Result<Self> {
194        self.data = serde_json::to_value(&value)?;
195
196        Ok(self)
197    }
198
199    pub fn metadata<M: Serialize>(mut self, value: M) -> Result<Self> {
200        let metadata = serde_json::to_value(&value)?;
201
202        if !metadata.is_object() {
203            return Err(StoreError::MetadataInvalidObjectType);
204        }
205
206        self.metadata = Some(metadata);
207
208        Ok(self)
209    }
210
211    pub fn raw_metadata(mut self, value: Option<Value>) -> Self {
212        self.metadata = value;
213
214        self
215    }
216
217    pub fn to_metadata<D: DeserializeOwned>(&self) -> Result<Option<D>> {
218        if let Some(metadata) = self.metadata.clone() {
219            Ok(Some(serde_json::from_value(metadata)?))
220        } else {
221            Ok(None)
222        }
223    }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
227#[cfg_attr(feature = "pg", derive(sqlx::FromRow))]
228pub struct Event {
229    pub id: Uuid,
230    pub name: String,
231    pub aggregate_id: String,
232    pub version: i32,
233    pub data: Value,
234    pub metadata: Option<Value>,
235    pub created_at: DateTime<Utc>,
236}
237
238impl Event {
239    pub fn to_data<D: DeserializeOwned>(&self) -> Result<D> {
240        Ok(serde_json::from_value(self.data.clone())?)
241    }
242
243    pub fn data<M: Serialize>(mut self, value: M) -> Result<Self> {
244        let data = serde_json::to_value(&value)?;
245
246        self.data = data;
247
248        Ok(self)
249    }
250
251    pub fn to_metadata<D: DeserializeOwned>(&self) -> Result<Option<D>> {
252        if let Some(metadata) = self.metadata.clone() {
253            Ok(Some(serde_json::from_value(metadata)?))
254        } else {
255            Ok(None)
256        }
257    }
258
259    pub fn metadata<M: Serialize>(mut self, value: M) -> Result<Self> {
260        let metadata = serde_json::to_value(&value)?;
261
262        if !metadata.is_object() {
263            return Err(StoreError::MetadataInvalidObjectType);
264        }
265
266        self.metadata = Some(metadata);
267
268        Ok(self)
269    }
270
271    pub fn aggregate_details(&self) -> Option<(String, String)> {
272        self.aggregate_id
273            .split_once('#')
274            .map(|(aggregate_type, id)| (aggregate_type.to_owned(), id.to_owned()))
275    }
276}
277
278impl Default for Event {
279    fn default() -> Self {
280        Self {
281            id: Uuid::new_v4(),
282            name: String::default(),
283            aggregate_id: String::default(),
284            version: i32::default(),
285            data: Value::default(),
286            metadata: None,
287            created_at: Utc::now(),
288        }
289    }
290}