Skip to main content

es_entity/
events.rs

1//! Manage events and related operations for event-sourcing.
2
3use chrono::{DateTime, Utc};
4
5use super::{error::EntityHydrationError, traits::*};
6
7/// An alias for iterator over the persisted events
8pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10/// Represent the events in raw deserialized format when loaded from database
11///
12/// Events in the database are stored as JSON blobs and loaded initially as `GenericEvents<Id>` where `Id`
13/// belongs to the entity the events is a part of. Acts a bridge between database model and
14/// domain model when later converted to the `PersistedEvent` type internally
15pub struct GenericEvent<Id> {
16    pub entity_id: Id,
17    pub sequence: i32,
18    pub event: serde_json::Value,
19    pub context: Option<crate::ContextData>,
20    pub recorded_at: DateTime<Utc>,
21}
22
23/// Strongly-typed event wrapper with metadata for successfully stored events.
24///
25/// Contains the event data along with persistence metadata (sequence, timestamp, entity_id).
26/// All `new_events` from [`EntityEvents`] are converted to this structure once persisted to construct
27/// entities, enabling event sourcing operations and other database operations.
28pub struct PersistedEvent<E: EsEvent> {
29    /// The identifier of the entity which the event is used to construct
30    pub entity_id: <E as EsEvent>::EntityId,
31    /// The timestamp which marks event persistence
32    pub recorded_at: DateTime<Utc>,
33    /// The sequence number of the event in the event stream
34    pub sequence: usize,
35    /// The event itself
36    pub event: E,
37    /// The context when the event was persisted
38    /// It is only popluated if 'event_context' set on EsEvent
39    pub context: Option<crate::ContextData>,
40}
41
42impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
43    fn clone(&self) -> Self {
44        PersistedEvent {
45            entity_id: self.entity_id.clone(),
46            recorded_at: self.recorded_at,
47            sequence: self.sequence,
48            event: self.event.clone(),
49            context: self.context.clone(),
50        }
51    }
52}
53
54pub struct EventWithContext<E: EsEvent> {
55    pub event: E,
56    pub context: Option<crate::ContextData>,
57}
58
59impl<E: Clone + EsEvent> Clone for EventWithContext<E> {
60    fn clone(&self) -> Self {
61        EventWithContext {
62            event: self.event.clone(),
63            context: self.context.clone(),
64        }
65    }
66}
67
68/// A [`Vec`] wrapper that manages event-stream of an entity with helpers for event-sourcing operations
69///
70/// Provides event sourcing operations for loading, appending, and persisting events in chronological
71/// sequence. Required field for all event-sourced entities to maintain their state change history.
72pub struct EntityEvents<T: EsEvent> {
73    /// The entity's id
74    pub entity_id: <T as EsEvent>::EntityId,
75    /// Events that have been persisted in database and marked
76    persisted_events: Vec<PersistedEvent<T>>,
77    /// New events that are yet to be persisted to track state changes
78    new_events: Vec<EventWithContext<T>>,
79}
80
81impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
82    fn clone(&self) -> Self {
83        Self {
84            entity_id: self.entity_id.clone(),
85            persisted_events: self.persisted_events.clone(),
86            new_events: self.new_events.clone(),
87        }
88    }
89}
90
91impl<T> EntityEvents<T>
92where
93    T: EsEvent,
94{
95    /// Initializes a new `EntityEvents` instance with the given entity ID and initial events which is returned by [`IntoEvents`] method
96    pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
97        let context = if <T as EsEvent>::event_context() {
98            Some(crate::EventContext::data_for_storing())
99        } else {
100            None
101        };
102        let new_events = initial_events
103            .into_iter()
104            .map(|event| EventWithContext {
105                event,
106                context: context.clone(),
107            })
108            .collect();
109        Self {
110            entity_id: id,
111            persisted_events: Vec::new(),
112            new_events,
113        }
114    }
115
116    /// Returns a reference to the entity's identifier
117    pub fn id(&self) -> &<T as EsEvent>::EntityId {
118        &self.entity_id
119    }
120
121    /// Returns the timestamp of the first persisted event, indicating when the entity was created
122    pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
123        self.persisted_events.first().map(|e| e.recorded_at)
124    }
125
126    /// Returns the timestamp of the last persisted event, indicating when the entity was last modified
127    pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
128        self.persisted_events.last().map(|e| e.recorded_at)
129    }
130
131    /// Appends a single new event to the entity's event stream to be persisted later
132    pub fn push(&mut self, event: T) {
133        let context = if <T as EsEvent>::event_context() {
134            Some(crate::EventContext::data_for_storing())
135        } else {
136            None
137        };
138        self.new_events.push(EventWithContext { event, context });
139    }
140
141    /// Appends multiple new events to the entity's event stream to be persisted later
142    pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
143        let context = if <T as EsEvent>::event_context() {
144            Some(crate::EventContext::data_for_storing())
145        } else {
146            None
147        };
148        self.new_events
149            .extend(events.into_iter().map(|event| EventWithContext {
150                event,
151                context: context.clone(),
152            }));
153    }
154
155    /// Returns true if there are any unpersisted events waiting to be saved
156    pub fn any_new(&self) -> bool {
157        !self.new_events.is_empty()
158    }
159
160    /// Returns the count of persisted events
161    pub fn len_persisted(&self) -> usize {
162        self.persisted_events.len()
163    }
164
165    /// Returns an iterator over all persisted events
166    pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
167        self.persisted_events.iter()
168    }
169
170    /// Returns an iterator over the last `n` persisted events
171    pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
172        let start = self.persisted_events.len() - n;
173        self.persisted_events[start..].iter()
174    }
175
176    /// Returns an iterator over all events (both persisted and new) in chronological order
177    pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
178        self.persisted_events
179            .iter()
180            .map(|e| &e.event)
181            .chain(self.new_events.iter().map(|e| &e.event))
182    }
183
184    /// Loads and reconstructs the first entity from a stream of GenericEvents, marking events as `persisted`.
185    ///
186    /// Returns `Ok(None)` if no events are present, `Ok(Some(entity))` on success.
187    pub fn load_first<E: EsEntity<Event = T>>(
188        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
189    ) -> Result<Option<E>, EntityHydrationError> {
190        let mut current_id = None;
191        let mut current = None;
192        for e in events {
193            if current_id.is_none() {
194                current_id = Some(e.entity_id.clone());
195                current = Some(Self {
196                    entity_id: e.entity_id.clone(),
197                    persisted_events: Vec::new(),
198                    new_events: Vec::new(),
199                });
200            }
201            if current_id.as_ref() != Some(&e.entity_id) {
202                break;
203            }
204            let cur = current.as_mut().expect("Could not get current");
205            cur.persisted_events.push(PersistedEvent {
206                entity_id: e.entity_id,
207                recorded_at: e.recorded_at,
208                sequence: e.sequence as usize,
209                event: serde_json::from_value(e.event)?,
210                context: e.context,
211            });
212        }
213        if let Some(current) = current {
214            Ok(Some(E::try_from_events(current)?))
215        } else {
216            Ok(None)
217        }
218    }
219
220    /// Loads and reconstructs up to `n` entities from a stream of GenericEvents.
221    /// Assumes the events are grouped by `id` and ordered by `sequence` per `id`.
222    ///
223    /// Returns both the entities and a flag indicating whether more entities were available in the stream.
224    pub fn load_n<E: EsEntity<Event = T>>(
225        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
226        n: usize,
227    ) -> Result<(Vec<E>, bool), EntityHydrationError> {
228        let mut ret: Vec<E> = Vec::new();
229        let mut current_id = None;
230        let mut current = None;
231        for e in events {
232            if current_id.as_ref() != Some(&e.entity_id) {
233                if let Some(current) = current.take() {
234                    ret.push(E::try_from_events(current)?);
235                    if ret.len() == n {
236                        return Ok((ret, true));
237                    }
238                }
239
240                current_id = Some(e.entity_id.clone());
241                current = Some(Self {
242                    entity_id: e.entity_id.clone(),
243                    persisted_events: Vec::new(),
244                    new_events: Vec::new(),
245                });
246            }
247            let cur = current.as_mut().expect("Could not get current");
248            cur.persisted_events.push(PersistedEvent {
249                entity_id: e.entity_id,
250                recorded_at: e.recorded_at,
251                sequence: e.sequence as usize,
252                event: serde_json::from_value(e.event)?,
253                context: e.context,
254            });
255        }
256        if let Some(current) = current.take() {
257            ret.push(E::try_from_events(current)?);
258        }
259        Ok((ret, false))
260    }
261
262    #[doc(hidden)]
263    pub fn mark_new_events_persisted_at(
264        &mut self,
265        recorded_at: chrono::DateTime<chrono::Utc>,
266    ) -> usize {
267        let n = self.new_events.len();
268        let offset = self.persisted_events.len() + 1;
269        self.persisted_events
270            .extend(
271                self.new_events
272                    .drain(..)
273                    .enumerate()
274                    .map(|(i, event)| PersistedEvent {
275                        entity_id: self.entity_id.clone(),
276                        recorded_at,
277                        sequence: i + offset,
278                        event: event.event,
279                        context: event.context,
280                    }),
281            );
282        n
283    }
284
285    #[doc(hidden)]
286    pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
287        self.new_events
288            .iter()
289            .map(|event| serde_json::to_value(&event.event).expect("Failed to serialize event"))
290            .collect()
291    }
292
293    #[doc(hidden)]
294    pub fn serialize_new_event_contexts(&self) -> Option<Vec<crate::ContextData>> {
295        if <T as EsEvent>::event_context() {
296            let contexts = self
297                .new_events
298                .iter()
299                .map(|event| event.context.clone().expect("Missing context"))
300                .collect();
301
302            Some(contexts)
303        } else {
304            None
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use uuid::Uuid;
313
314    #[derive(Debug, serde::Serialize, serde::Deserialize)]
315    enum DummyEntityEvent {
316        Created(String),
317    }
318
319    impl EsEvent for DummyEntityEvent {
320        type EntityId = Uuid;
321        fn event_context() -> bool {
322            true
323        }
324    }
325
326    struct DummyEntity {
327        name: String,
328
329        events: EntityEvents<DummyEntityEvent>,
330    }
331
332    impl EsEntity for DummyEntity {
333        type Event = DummyEntityEvent;
334        type New = NewDummyEntity;
335
336        fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
337            &mut self.events
338        }
339        fn events(&self) -> &EntityEvents<DummyEntityEvent> {
340            &self.events
341        }
342    }
343
344    impl TryFromEvents<DummyEntityEvent> for DummyEntity {
345        fn try_from_events(
346            events: EntityEvents<DummyEntityEvent>,
347        ) -> Result<Self, EntityHydrationError> {
348            let name = events
349                .iter_persisted()
350                .map(|e| match &e.event {
351                    DummyEntityEvent::Created(name) => name.clone(),
352                })
353                .next()
354                .expect("Could not find name");
355            Ok(Self { name, events })
356        }
357    }
358
359    struct NewDummyEntity {}
360
361    impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
362        fn into_events(self) -> EntityEvents<DummyEntityEvent> {
363            EntityEvents::init(
364                Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
365                vec![DummyEntityEvent::Created("".to_owned())],
366            )
367        }
368    }
369
370    #[test]
371    fn load_zero_events() {
372        let generic_events = vec![];
373        let res = EntityEvents::load_first::<DummyEntity>(generic_events);
374        assert!(matches!(res, Ok(None)));
375    }
376
377    #[test]
378    fn load_first() {
379        let generic_events = vec![GenericEvent {
380            entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
381            sequence: 1,
382            event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
383                .expect("Could not serialize"),
384            context: None,
385            recorded_at: chrono::Utc::now(),
386        }];
387        let entity: DummyEntity = EntityEvents::load_first(generic_events)
388            .expect("Could not load")
389            .expect("No entity found");
390        assert!(entity.name == "dummy-name");
391    }
392
393    #[test]
394    fn load_n() {
395        let generic_events = vec![
396            GenericEvent {
397                entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
398                sequence: 1,
399                event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
400                    .expect("Could not serialize"),
401                context: None,
402                recorded_at: chrono::Utc::now(),
403            },
404            GenericEvent {
405                entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap(),
406                sequence: 1,
407                event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
408                    .expect("Could not serialize"),
409                context: None,
410                recorded_at: chrono::Utc::now(),
411            },
412        ];
413        let (entity, more): (Vec<DummyEntity>, _) =
414            EntityEvents::load_n(generic_events, 2).expect("Could not load");
415        assert!(!more);
416        assert_eq!(entity.len(), 2);
417    }
418}