1use chrono::{DateTime, Utc};
4
5use super::{error::EntityHydrationError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10pub struct GenericEvent<Id> {
16 pub entity_id: Id,
17 pub sequence: i32,
18 pub event: serde_json::Value,
19 pub context: Option<crate::ContextData>,
20 pub recorded_at: DateTime<Utc>,
21}
22
23pub struct PersistedEvent<E: EsEvent> {
29 pub entity_id: <E as EsEvent>::EntityId,
31 pub recorded_at: DateTime<Utc>,
33 pub sequence: usize,
35 pub event: E,
37 pub context: Option<crate::ContextData>,
40}
41
42impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
43 fn clone(&self) -> Self {
44 PersistedEvent {
45 entity_id: self.entity_id.clone(),
46 recorded_at: self.recorded_at,
47 sequence: self.sequence,
48 event: self.event.clone(),
49 context: self.context.clone(),
50 }
51 }
52}
53
54pub struct EventWithContext<E: EsEvent> {
55 pub event: E,
56 pub context: Option<crate::ContextData>,
57}
58
59impl<E: Clone + EsEvent> Clone for EventWithContext<E> {
60 fn clone(&self) -> Self {
61 EventWithContext {
62 event: self.event.clone(),
63 context: self.context.clone(),
64 }
65 }
66}
67
68pub struct EntityEvents<T: EsEvent> {
73 pub entity_id: <T as EsEvent>::EntityId,
75 persisted_events: Vec<PersistedEvent<T>>,
77 new_events: Vec<EventWithContext<T>>,
79}
80
81impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
82 fn clone(&self) -> Self {
83 Self {
84 entity_id: self.entity_id.clone(),
85 persisted_events: self.persisted_events.clone(),
86 new_events: self.new_events.clone(),
87 }
88 }
89}
90
91impl<T> EntityEvents<T>
92where
93 T: EsEvent,
94{
95 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
97 let context = if <T as EsEvent>::event_context() {
98 Some(crate::EventContext::data_for_storing())
99 } else {
100 None
101 };
102 let new_events = initial_events
103 .into_iter()
104 .map(|event| EventWithContext {
105 event,
106 context: context.clone(),
107 })
108 .collect();
109 Self {
110 entity_id: id,
111 persisted_events: Vec::new(),
112 new_events,
113 }
114 }
115
116 pub fn id(&self) -> &<T as EsEvent>::EntityId {
118 &self.entity_id
119 }
120
121 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
123 self.persisted_events.first().map(|e| e.recorded_at)
124 }
125
126 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
128 self.persisted_events.last().map(|e| e.recorded_at)
129 }
130
131 pub fn push(&mut self, event: T) {
133 let context = if <T as EsEvent>::event_context() {
134 Some(crate::EventContext::data_for_storing())
135 } else {
136 None
137 };
138 self.new_events.push(EventWithContext { event, context });
139 }
140
141 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
143 let context = if <T as EsEvent>::event_context() {
144 Some(crate::EventContext::data_for_storing())
145 } else {
146 None
147 };
148 self.new_events
149 .extend(events.into_iter().map(|event| EventWithContext {
150 event,
151 context: context.clone(),
152 }));
153 }
154
155 pub fn any_new(&self) -> bool {
157 !self.new_events.is_empty()
158 }
159
160 pub fn len_persisted(&self) -> usize {
162 self.persisted_events.len()
163 }
164
165 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
167 self.persisted_events.iter()
168 }
169
170 pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
172 let start = self.persisted_events.len() - n;
173 self.persisted_events[start..].iter()
174 }
175
176 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
178 self.persisted_events
179 .iter()
180 .map(|e| &e.event)
181 .chain(self.new_events.iter().map(|e| &e.event))
182 }
183
184 pub fn load_first<E: EsEntity<Event = T>>(
188 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
189 ) -> Result<Option<E>, EntityHydrationError> {
190 let mut current_id = None;
191 let mut current = None;
192 for e in events {
193 if current_id.is_none() {
194 current_id = Some(e.entity_id.clone());
195 current = Some(Self {
196 entity_id: e.entity_id.clone(),
197 persisted_events: Vec::new(),
198 new_events: Vec::new(),
199 });
200 }
201 if current_id.as_ref() != Some(&e.entity_id) {
202 break;
203 }
204 let cur = current.as_mut().expect("Could not get current");
205 cur.persisted_events.push(PersistedEvent {
206 entity_id: e.entity_id,
207 recorded_at: e.recorded_at,
208 sequence: e.sequence as usize,
209 event: serde_json::from_value(e.event)?,
210 context: e.context,
211 });
212 }
213 if let Some(current) = current {
214 Ok(Some(E::try_from_events(current)?))
215 } else {
216 Ok(None)
217 }
218 }
219
220 pub fn load_n<E: EsEntity<Event = T>>(
225 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
226 n: usize,
227 ) -> Result<(Vec<E>, bool), EntityHydrationError> {
228 let mut ret: Vec<E> = Vec::new();
229 let mut current_id = None;
230 let mut current = None;
231 for e in events {
232 if current_id.as_ref() != Some(&e.entity_id) {
233 if let Some(current) = current.take() {
234 ret.push(E::try_from_events(current)?);
235 if ret.len() == n {
236 return Ok((ret, true));
237 }
238 }
239
240 current_id = Some(e.entity_id.clone());
241 current = Some(Self {
242 entity_id: e.entity_id.clone(),
243 persisted_events: Vec::new(),
244 new_events: Vec::new(),
245 });
246 }
247 let cur = current.as_mut().expect("Could not get current");
248 cur.persisted_events.push(PersistedEvent {
249 entity_id: e.entity_id,
250 recorded_at: e.recorded_at,
251 sequence: e.sequence as usize,
252 event: serde_json::from_value(e.event)?,
253 context: e.context,
254 });
255 }
256 if let Some(current) = current.take() {
257 ret.push(E::try_from_events(current)?);
258 }
259 Ok((ret, false))
260 }
261
262 #[doc(hidden)]
263 pub fn mark_new_events_persisted_at(
264 &mut self,
265 recorded_at: chrono::DateTime<chrono::Utc>,
266 ) -> usize {
267 let n = self.new_events.len();
268 let offset = self.persisted_events.len() + 1;
269 self.persisted_events
270 .extend(
271 self.new_events
272 .drain(..)
273 .enumerate()
274 .map(|(i, event)| PersistedEvent {
275 entity_id: self.entity_id.clone(),
276 recorded_at,
277 sequence: i + offset,
278 event: event.event,
279 context: event.context,
280 }),
281 );
282 n
283 }
284
285 #[doc(hidden)]
286 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
287 self.new_events
288 .iter()
289 .map(|event| serde_json::to_value(&event.event).expect("Failed to serialize event"))
290 .collect()
291 }
292
293 #[doc(hidden)]
294 pub fn serialize_new_event_contexts(&self) -> Option<Vec<crate::ContextData>> {
295 if <T as EsEvent>::event_context() {
296 let contexts = self
297 .new_events
298 .iter()
299 .map(|event| event.context.clone().expect("Missing context"))
300 .collect();
301
302 Some(contexts)
303 } else {
304 None
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use uuid::Uuid;
313
314 #[derive(Debug, serde::Serialize, serde::Deserialize)]
315 enum DummyEntityEvent {
316 Created(String),
317 }
318
319 impl EsEvent for DummyEntityEvent {
320 type EntityId = Uuid;
321 fn event_context() -> bool {
322 true
323 }
324 }
325
326 struct DummyEntity {
327 name: String,
328
329 events: EntityEvents<DummyEntityEvent>,
330 }
331
332 impl EsEntity for DummyEntity {
333 type Event = DummyEntityEvent;
334 type New = NewDummyEntity;
335
336 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
337 &mut self.events
338 }
339 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
340 &self.events
341 }
342 }
343
344 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
345 fn try_from_events(
346 events: EntityEvents<DummyEntityEvent>,
347 ) -> Result<Self, EntityHydrationError> {
348 let name = events
349 .iter_persisted()
350 .map(|e| match &e.event {
351 DummyEntityEvent::Created(name) => name.clone(),
352 })
353 .next()
354 .expect("Could not find name");
355 Ok(Self { name, events })
356 }
357 }
358
359 struct NewDummyEntity {}
360
361 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
362 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
363 EntityEvents::init(
364 Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
365 vec![DummyEntityEvent::Created("".to_owned())],
366 )
367 }
368 }
369
370 #[test]
371 fn load_zero_events() {
372 let generic_events = vec![];
373 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
374 assert!(matches!(res, Ok(None)));
375 }
376
377 #[test]
378 fn load_first() {
379 let generic_events = vec![GenericEvent {
380 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
381 sequence: 1,
382 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
383 .expect("Could not serialize"),
384 context: None,
385 recorded_at: chrono::Utc::now(),
386 }];
387 let entity: DummyEntity = EntityEvents::load_first(generic_events)
388 .expect("Could not load")
389 .expect("No entity found");
390 assert!(entity.name == "dummy-name");
391 }
392
393 #[test]
394 fn load_n() {
395 let generic_events = vec![
396 GenericEvent {
397 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
398 sequence: 1,
399 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
400 .expect("Could not serialize"),
401 context: None,
402 recorded_at: chrono::Utc::now(),
403 },
404 GenericEvent {
405 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap(),
406 sequence: 1,
407 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
408 .expect("Could not serialize"),
409 context: None,
410 recorded_at: chrono::Utc::now(),
411 },
412 ];
413 let (entity, more): (Vec<DummyEntity>, _) =
414 EntityEvents::load_n(generic_events, 2).expect("Could not load");
415 assert!(!more);
416 assert_eq!(entity.len(), 2);
417 }
418}