1use chrono::{DateTime, Utc};
2use evento_query::{CursorType, QueryResult};
3use serde::{de::DeserializeOwned, Deserialize, Serialize};
4use serde_json::Value;
5use uuid::Uuid;
6
7use crate::{engine::Engine, error::Result, Aggregate, Applier, StoreError};
8
9#[derive(Debug, Serialize, Deserialize)]
10pub struct SnapshotMetadata {
11 pub cursor: Option<CursorType>,
12 pub version: u16,
13 pub snapshot_version: String,
14 pub created_at: DateTime<Utc>,
15}
16
17#[derive(Clone)]
18pub struct Store {
19 pub(crate) engine: Box<dyn Engine>,
20}
21
22impl Store {
23 pub fn new<E: Engine + 'static>(engine: E) -> Self {
24 Self {
25 engine: Box::new(engine),
26 }
27 }
28
29 pub async fn load<A: Aggregate + Applier>(
30 &self,
31 aggregate_id: impl Into<String>,
32 ) -> Result<Option<(A, u16)>> {
33 self.load_with(aggregate_id, 100).await
34 }
35
36 pub async fn load_with<A: Aggregate + Applier>(
37 &self,
38 aggregate_id: impl Into<String>,
39 first: u16,
40 ) -> Result<Option<(A, u16)>> {
41 let aggregate_id = aggregate_id.into();
42 let snapshot_id = format!("snapshot-{aggregate_id}");
43
44 let (mut aggregate, mut cursor, mut version) =
45 match self.first_of::<A>(&snapshot_id).await? {
46 Some(e) => match (e.version, e.to_metadata::<SnapshotMetadata>()?) {
47 (0, Some(metadata)) => {
48 if A::aggregate_version() == metadata.snapshot_version {
49 (e.to_data::<A>()?, metadata.cursor, metadata.version)
50 } else {
51 (A::default(), None, 0)
52 }
53 }
54 _ => (A::default(), None, 0),
55 },
56 _ => (A::default(), None, 0),
57 };
58
59 loop {
60 let events = self.read_of::<A>(&aggregate_id, first, cursor).await?;
61
62 if events.edges.is_empty() {
63 return Ok(None);
64 }
65
66 for event in events.edges.iter() {
67 aggregate.apply(&event.node);
68 version = u16::try_from(event.node.version)?;
69 }
70
71 let snapshot = Event {
72 name: "_snapshot".to_owned(),
73 aggregate_id: A::to_aggregate_id(&snapshot_id),
74 data: serde_json::to_value(&aggregate)?,
75 metadata: Some(serde_json::to_value(SnapshotMetadata {
76 cursor: events.page_info.end_cursor.clone(),
77 version,
78 snapshot_version: A::aggregate_version().to_owned(),
79 created_at: Utc::now(),
80 })?),
81 ..Event::default()
82 };
83
84 self.engine.upsert(snapshot).await?;
85
86 if !events.page_info.has_next_page {
87 break;
88 }
89
90 cursor = events.page_info.end_cursor;
91 }
92
93 Ok(Some((aggregate, version)))
94 }
95
96 pub async fn write<A: Aggregate>(
97 &self,
98 aggregate_id: impl Into<String>,
99 event: WriteEvent,
100 original_version: u16,
101 ) -> Result<Event> {
102 let events = self
103 .write_all::<A>(aggregate_id, vec![event], original_version)
104 .await?;
105
106 match events.first() {
107 Some(event) => Ok(event.clone()),
108 _ => Err(crate::StoreError::EmptyWriteEvent),
109 }
110 }
111
112 pub async fn write_all<A: Aggregate>(
113 &self,
114 aggregate_id: impl Into<String>,
115 events: Vec<WriteEvent>,
116 original_version: u16,
117 ) -> Result<Vec<Event>> {
118 self.engine
119 .write(
120 A::to_aggregate_id(aggregate_id).as_str(),
121 events,
122 original_version,
123 )
124 .await
125 }
126
127 pub async fn insert(&self, events: Vec<Event>) -> Result<()> {
128 self.engine.insert(events).await
129 }
130
131 pub async fn read(
132 &self,
133 first: u16,
134 after: Option<CursorType>,
135 filters: Option<Vec<Value>>,
136 ) -> Result<QueryResult<Event>> {
137 self.engine.read(first, after, filters, None).await
138 }
139
140 pub async fn read_of<A: Aggregate>(
141 &self,
142 aggregate_id: impl Into<String>,
143 first: u16,
144 after: Option<CursorType>,
145 ) -> Result<QueryResult<Event>> {
146 let aggregate_id = A::to_aggregate_id(aggregate_id);
147
148 self.engine
149 .read(first, after, None, Some(aggregate_id.as_str()))
150 .await
151 }
152
153 pub async fn first_of<A: Aggregate>(
154 &self,
155 aggregate_id: impl Into<String>,
156 ) -> Result<Option<Event>> {
157 let events = self.read_of::<A>(aggregate_id, 1, None).await?;
158
159 Ok(events.edges.first().map(|e| e.node.clone()))
160 }
161
162 pub async fn last(&self) -> Result<Option<Event>> {
163 self.engine.last().await
164 }
165}
166
167#[derive(Debug, Clone, Default)]
168pub struct WriteEvent {
169 pub name: String,
170 pub data: Value,
171 pub metadata: Option<Value>,
172}
173
174impl WriteEvent {
175 pub fn new<N: Into<String>>(name: N) -> Self {
176 Self {
177 name: name.into(),
178 ..Self::default()
179 }
180 }
181
182 pub fn to_event(&self, aggregate_id: impl Into<String>, version: u16) -> Event {
183 Event {
184 name: self.name.to_owned(),
185 aggregate_id: aggregate_id.into(),
186 version: i32::from(version),
187 data: self.data.clone(),
188 metadata: self.metadata.clone(),
189 ..Default::default()
190 }
191 }
192
193 pub fn data<D: Serialize>(mut self, value: D) -> Result<Self> {
194 self.data = serde_json::to_value(&value)?;
195
196 Ok(self)
197 }
198
199 pub fn metadata<M: Serialize>(mut self, value: M) -> Result<Self> {
200 let metadata = serde_json::to_value(&value)?;
201
202 if !metadata.is_object() {
203 return Err(StoreError::MetadataInvalidObjectType);
204 }
205
206 self.metadata = Some(metadata);
207
208 Ok(self)
209 }
210
211 pub fn raw_metadata(mut self, value: Option<Value>) -> Self {
212 self.metadata = value;
213
214 self
215 }
216
217 pub fn to_metadata<D: DeserializeOwned>(&self) -> Result<Option<D>> {
218 if let Some(metadata) = self.metadata.clone() {
219 Ok(Some(serde_json::from_value(metadata)?))
220 } else {
221 Ok(None)
222 }
223 }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
227#[cfg_attr(feature = "pg", derive(sqlx::FromRow))]
228pub struct Event {
229 pub id: Uuid,
230 pub name: String,
231 pub aggregate_id: String,
232 pub version: i32,
233 pub data: Value,
234 pub metadata: Option<Value>,
235 pub created_at: DateTime<Utc>,
236}
237
238impl Event {
239 pub fn to_data<D: DeserializeOwned>(&self) -> Result<D> {
240 Ok(serde_json::from_value(self.data.clone())?)
241 }
242
243 pub fn data<M: Serialize>(mut self, value: M) -> Result<Self> {
244 let data = serde_json::to_value(&value)?;
245
246 self.data = data;
247
248 Ok(self)
249 }
250
251 pub fn to_metadata<D: DeserializeOwned>(&self) -> Result<Option<D>> {
252 if let Some(metadata) = self.metadata.clone() {
253 Ok(Some(serde_json::from_value(metadata)?))
254 } else {
255 Ok(None)
256 }
257 }
258
259 pub fn metadata<M: Serialize>(mut self, value: M) -> Result<Self> {
260 let metadata = serde_json::to_value(&value)?;
261
262 if !metadata.is_object() {
263 return Err(StoreError::MetadataInvalidObjectType);
264 }
265
266 self.metadata = Some(metadata);
267
268 Ok(self)
269 }
270
271 pub fn aggregate_details(&self) -> Option<(String, String)> {
272 self.aggregate_id
273 .split_once('#')
274 .map(|(aggregate_type, id)| (aggregate_type.to_owned(), id.to_owned()))
275 }
276}
277
278impl Default for Event {
279 fn default() -> Self {
280 Self {
281 id: Uuid::new_v4(),
282 name: String::default(),
283 aggregate_id: String::default(),
284 version: i32::default(),
285 data: Value::default(),
286 metadata: None,
287 created_at: Utc::now(),
288 }
289 }
290}