es_entity/
events.rs

1//! Manage events and related operations for event-sourcing.
2
3use chrono::{DateTime, Utc};
4
5use super::{error::EsEntityError, traits::*};
6
7/// An alias for iterator over the persisted events
8pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10/// Represent the events in raw deserialized format when loaded from database
11///
12/// Events in the database are stored as JSON blobs and loaded initially as `GenericEvents<Id>` where `Id`
13/// belongs to the entity the events is a part of. Acts a bridge between database model and
14/// domain model when later converted to the `PersistedEvent` type internally
15pub struct GenericEvent<Id> {
16    pub entity_id: Id,
17    pub sequence: i32,
18    pub event: serde_json::Value,
19    pub recorded_at: DateTime<Utc>,
20}
21
22/// Strongly-typed event wrapper with metadata for successfully stored events.
23///
24/// Contains the event data along with persistence metadata (sequence, timestamp, entity_id).
25/// All `new_events` from [`EntityEvents`] are converted to this structure once persisted to construct
26/// entities, enabling event sourcing operations and other database operations.
27pub struct PersistedEvent<E: EsEvent> {
28    /// The identifier of the entity which the event is used to construct
29    pub entity_id: <E as EsEvent>::EntityId,
30    /// The timestamp which marks event persistence
31    pub recorded_at: DateTime<Utc>,
32    /// The sequence number of the event in the event stream
33    pub sequence: usize,
34    /// The event itself
35    pub event: E,
36}
37
38impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
39    fn clone(&self) -> Self {
40        PersistedEvent {
41            entity_id: self.entity_id.clone(),
42            recorded_at: self.recorded_at,
43            sequence: self.sequence,
44            event: self.event.clone(),
45        }
46    }
47}
48
49/// A [`Vec`] wrapper that manages event-stream of an entity with helpers for event-sourcing operations
50///
51/// Provides event sourcing operations for loading, appending, and persisting events in chronological
52/// sequence. Required field for all event-sourced entities to maintain their state change history.
53pub struct EntityEvents<T: EsEvent> {
54    /// The entity's id
55    pub entity_id: <T as EsEvent>::EntityId,
56    /// Events that have been persisted in database and marked
57    persisted_events: Vec<PersistedEvent<T>>,
58    /// New events that are yet to be persisted to track state changes
59    new_events: Vec<T>,
60}
61
62impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
63    fn clone(&self) -> Self {
64        Self {
65            entity_id: self.entity_id.clone(),
66            persisted_events: self.persisted_events.clone(),
67            new_events: self.new_events.clone(),
68        }
69    }
70}
71
72impl<T> EntityEvents<T>
73where
74    T: EsEvent,
75{
76    /// Initializes a new `EntityEvents` instance with the given entity ID and initial events which is returned by [`IntoEvents`] method
77    pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
78        Self {
79            entity_id: id,
80            persisted_events: Vec::new(),
81            new_events: initial_events.into_iter().collect(),
82        }
83    }
84
85    /// Returns a reference to the entity's identifier
86    pub fn id(&self) -> &<T as EsEvent>::EntityId {
87        &self.entity_id
88    }
89
90    /// Returns the timestamp of the first persisted event, indicating when the entity was created
91    pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
92        self.persisted_events.first().map(|e| e.recorded_at)
93    }
94
95    /// Returns the timestamp of the last persisted event, indicating when the entity was last modified
96    pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
97        self.persisted_events.last().map(|e| e.recorded_at)
98    }
99
100    /// Appends a single new event to the entity's event stream to be persisted later
101    pub fn push(&mut self, event: T) {
102        self.new_events.push(event);
103    }
104
105    /// Appends multiple new events to the entity's event stream to be persisted later
106    pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
107        self.new_events.extend(events);
108    }
109
110    /// Returns true if there are any unpersisted events waiting to be saved
111    pub fn any_new(&self) -> bool {
112        !self.new_events.is_empty()
113    }
114
115    /// Returns the count of persisted events
116    pub fn len_persisted(&self) -> usize {
117        self.persisted_events.len()
118    }
119
120    /// Returns an iterator over all persisted events
121    pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
122        self.persisted_events.iter()
123    }
124
125    /// Returns an iterator over the last `n` persisted events
126    pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
127        let start = self.persisted_events.len() - n;
128        self.persisted_events[start..].iter()
129    }
130
131    /// Returns an iterator over all events (both persisted and new) in chronological order
132    pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
133        self.persisted_events
134            .iter()
135            .map(|e| &e.event)
136            .chain(self.new_events.iter())
137    }
138
139    /// Loads and reconstructs the first entity from a stream of GenericEvents, marking events as `peristed`
140    pub fn load_first<E: EsEntity<Event = T>>(
141        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
142    ) -> Result<E, EsEntityError> {
143        let mut current_id = None;
144        let mut current = None;
145        for e in events {
146            if current_id.is_none() {
147                current_id = Some(e.entity_id.clone());
148                current = Some(Self {
149                    entity_id: e.entity_id.clone(),
150                    persisted_events: Vec::new(),
151                    new_events: Vec::new(),
152                });
153            }
154            if current_id.as_ref() != Some(&e.entity_id) {
155                break;
156            }
157            let cur = current.as_mut().expect("Could not get current");
158            cur.persisted_events.push(PersistedEvent {
159                entity_id: e.entity_id,
160                recorded_at: e.recorded_at,
161                sequence: e.sequence as usize,
162                event: serde_json::from_value(e.event)?,
163            });
164        }
165        if let Some(current) = current {
166            E::try_from_events(current)
167        } else {
168            Err(EsEntityError::NotFound)
169        }
170    }
171
172    /// Loads and reconstructs up to `n` entities from a stream of GenericEvents.
173    /// Assumes the events are grouped by `id` and ordered by `sequence` per `id`.
174    ///
175    /// Returns both the entities and a flag indicating whether more entities were available in the stream.
176    pub fn load_n<E: EsEntity<Event = T>>(
177        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
178        n: usize,
179    ) -> Result<(Vec<E>, bool), EsEntityError> {
180        let mut ret: Vec<E> = Vec::new();
181        let mut current_id = None;
182        let mut current = None;
183        for e in events {
184            if current_id.as_ref() != Some(&e.entity_id) {
185                if let Some(current) = current.take() {
186                    ret.push(E::try_from_events(current)?);
187                    if ret.len() == n {
188                        return Ok((ret, true));
189                    }
190                }
191
192                current_id = Some(e.entity_id.clone());
193                current = Some(Self {
194                    entity_id: e.entity_id.clone(),
195                    persisted_events: Vec::new(),
196                    new_events: Vec::new(),
197                });
198            }
199            let cur = current.as_mut().expect("Could not get current");
200            cur.persisted_events.push(PersistedEvent {
201                entity_id: e.entity_id,
202                recorded_at: e.recorded_at,
203                sequence: e.sequence as usize,
204                event: serde_json::from_value(e.event)?,
205            });
206        }
207        if let Some(current) = current.take() {
208            ret.push(E::try_from_events(current)?);
209        }
210        Ok((ret, false))
211    }
212
213    #[doc(hidden)]
214    pub fn mark_new_events_persisted_at(
215        &mut self,
216        recorded_at: chrono::DateTime<chrono::Utc>,
217    ) -> usize {
218        let n = self.new_events.len();
219        let offset = self.persisted_events.len() + 1;
220        self.persisted_events
221            .extend(
222                self.new_events
223                    .drain(..)
224                    .enumerate()
225                    .map(|(i, event)| PersistedEvent {
226                        entity_id: self.entity_id.clone(),
227                        recorded_at,
228                        sequence: i + offset,
229                        event,
230                    }),
231            );
232        n
233    }
234
235    #[doc(hidden)]
236    pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
237        self.new_events
238            .iter()
239            .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
240            .collect()
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use uuid::Uuid;
248
249    #[derive(Debug, serde::Serialize, serde::Deserialize)]
250    enum DummyEntityEvent {
251        Created(String),
252    }
253
254    impl EsEvent for DummyEntityEvent {
255        type EntityId = Uuid;
256    }
257
258    struct DummyEntity {
259        name: String,
260
261        events: EntityEvents<DummyEntityEvent>,
262    }
263
264    impl EsEntity for DummyEntity {
265        type Event = DummyEntityEvent;
266        type New = NewDummyEntity;
267
268        fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
269            &mut self.events
270        }
271        fn events(&self) -> &EntityEvents<DummyEntityEvent> {
272            &self.events
273        }
274    }
275
276    impl TryFromEvents<DummyEntityEvent> for DummyEntity {
277        fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
278            let name = events
279                .iter_persisted()
280                .map(|e| match &e.event {
281                    DummyEntityEvent::Created(name) => name.clone(),
282                })
283                .next()
284                .expect("Could not find name");
285            Ok(Self { name, events })
286        }
287    }
288
289    struct NewDummyEntity {}
290
291    impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
292        fn into_events(self) -> EntityEvents<DummyEntityEvent> {
293            EntityEvents::init(
294                Uuid::now_v7(),
295                vec![DummyEntityEvent::Created("".to_owned())],
296            )
297        }
298    }
299
300    #[test]
301    fn load_zero_events() {
302        let generic_events = vec![];
303        let res = EntityEvents::load_first::<DummyEntity>(generic_events);
304        assert!(matches!(res, Err(EsEntityError::NotFound)));
305    }
306
307    #[test]
308    fn load_first() {
309        let generic_events = vec![GenericEvent {
310            entity_id: uuid::Uuid::now_v7(),
311            sequence: 1,
312            event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
313                .expect("Could not serialize"),
314            recorded_at: chrono::Utc::now(),
315        }];
316        let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
317        assert!(entity.name == "dummy-name");
318    }
319
320    #[test]
321    fn load_n() {
322        let generic_events = vec![
323            GenericEvent {
324                entity_id: uuid::Uuid::now_v7(),
325                sequence: 1,
326                event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
327                    .expect("Could not serialize"),
328                recorded_at: chrono::Utc::now(),
329            },
330            GenericEvent {
331                entity_id: uuid::Uuid::now_v7(),
332                sequence: 1,
333                event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
334                    .expect("Could not serialize"),
335                recorded_at: chrono::Utc::now(),
336            },
337        ];
338        let (entity, more): (Vec<DummyEntity>, _) =
339            EntityEvents::load_n(generic_events, 2).expect("Could not load");
340        assert!(!more);
341        assert_eq!(entity.len(), 2);
342    }
343}