es_entity/
events.rs

1//! Manage events and operations for event-sourcing.
2
3use chrono::{DateTime, Utc};
4
5use super::{error::EsEntityError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
8
9pub struct GenericEvent<Id> {
10    pub entity_id: Id,
11    pub sequence: i32,
12    pub event: serde_json::Value,
13    pub recorded_at: DateTime<Utc>,
14}
15
16pub struct PersistedEvent<E: EsEvent> {
17    pub entity_id: <E as EsEvent>::EntityId,
18    pub recorded_at: DateTime<Utc>,
19    pub sequence: usize,
20    pub event: E,
21}
22
23impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
24    fn clone(&self) -> Self {
25        PersistedEvent {
26            entity_id: self.entity_id.clone(),
27            recorded_at: self.recorded_at,
28            sequence: self.sequence,
29            event: self.event.clone(),
30        }
31    }
32}
33
34pub struct EntityEvents<T: EsEvent> {
35    pub entity_id: <T as EsEvent>::EntityId,
36    persisted_events: Vec<PersistedEvent<T>>,
37    new_events: Vec<T>,
38}
39
40impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
41    fn clone(&self) -> Self {
42        Self {
43            entity_id: self.entity_id.clone(),
44            persisted_events: self.persisted_events.clone(),
45            new_events: self.new_events.clone(),
46        }
47    }
48}
49
50impl<T> EntityEvents<T>
51where
52    T: EsEvent,
53{
54    pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
55        Self {
56            entity_id: id,
57            persisted_events: Vec::new(),
58            new_events: initial_events.into_iter().collect(),
59        }
60    }
61
62    pub fn id(&self) -> &<T as EsEvent>::EntityId {
63        &self.entity_id
64    }
65
66    pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
67        self.persisted_events.first().map(|e| e.recorded_at)
68    }
69
70    pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
71        self.persisted_events.last().map(|e| e.recorded_at)
72    }
73
74    pub fn push(&mut self, event: T) {
75        self.new_events.push(event);
76    }
77
78    pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
79        self.new_events.extend(events);
80    }
81
82    pub fn mark_new_events_persisted_at(
83        &mut self,
84        recorded_at: chrono::DateTime<chrono::Utc>,
85    ) -> usize {
86        let n = self.new_events.len();
87        let offset = self.persisted_events.len() + 1;
88        self.persisted_events
89            .extend(
90                self.new_events
91                    .drain(..)
92                    .enumerate()
93                    .map(|(i, event)| PersistedEvent {
94                        entity_id: self.entity_id.clone(),
95                        recorded_at,
96                        sequence: i + offset,
97                        event,
98                    }),
99            );
100        n
101    }
102
103    pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
104        self.new_events
105            .iter()
106            .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
107            .collect()
108    }
109
110    pub fn any_new(&self) -> bool {
111        !self.new_events.is_empty()
112    }
113
114    pub fn len_persisted(&self) -> usize {
115        self.persisted_events.len()
116    }
117
118    pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
119        self.persisted_events.iter()
120    }
121
122    pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
123        let start = self.persisted_events.len() - n;
124        self.persisted_events[start..].iter()
125    }
126
127    pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
128        self.persisted_events
129            .iter()
130            .map(|e| &e.event)
131            .chain(self.new_events.iter())
132    }
133
134    pub fn load_first<E: EsEntity<Event = T>>(
135        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
136    ) -> Result<E, EsEntityError> {
137        let mut current_id = None;
138        let mut current = None;
139        for e in events {
140            if current_id.is_none() {
141                current_id = Some(e.entity_id.clone());
142                current = Some(Self {
143                    entity_id: e.entity_id.clone(),
144                    persisted_events: Vec::new(),
145                    new_events: Vec::new(),
146                });
147            }
148            if current_id.as_ref() != Some(&e.entity_id) {
149                break;
150            }
151            let cur = current.as_mut().expect("Could not get current");
152            cur.persisted_events.push(PersistedEvent {
153                entity_id: e.entity_id,
154                recorded_at: e.recorded_at,
155                sequence: e.sequence as usize,
156                event: serde_json::from_value(e.event)?,
157            });
158        }
159        if let Some(current) = current {
160            E::try_from_events(current)
161        } else {
162            Err(EsEntityError::NotFound)
163        }
164    }
165
166    pub fn load_n<E: EsEntity<Event = T>>(
167        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
168        n: usize,
169    ) -> Result<(Vec<E>, bool), EsEntityError> {
170        let mut ret: Vec<E> = Vec::new();
171        let mut current_id = None;
172        let mut current = None;
173        for e in events {
174            if current_id.as_ref() != Some(&e.entity_id) {
175                if let Some(current) = current.take() {
176                    ret.push(E::try_from_events(current)?);
177                    if ret.len() == n {
178                        return Ok((ret, true));
179                    }
180                }
181
182                current_id = Some(e.entity_id.clone());
183                current = Some(Self {
184                    entity_id: e.entity_id.clone(),
185                    persisted_events: Vec::new(),
186                    new_events: Vec::new(),
187                });
188            }
189            let cur = current.as_mut().expect("Could not get current");
190            cur.persisted_events.push(PersistedEvent {
191                entity_id: e.entity_id,
192                recorded_at: e.recorded_at,
193                sequence: e.sequence as usize,
194                event: serde_json::from_value(e.event)?,
195            });
196        }
197        if let Some(current) = current.take() {
198            ret.push(E::try_from_events(current)?);
199        }
200        Ok((ret, false))
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use uuid::Uuid;
208
209    #[derive(Debug, serde::Serialize, serde::Deserialize)]
210    enum DummyEntityEvent {
211        Created(String),
212    }
213
214    impl EsEvent for DummyEntityEvent {
215        type EntityId = Uuid;
216    }
217
218    struct DummyEntity {
219        name: String,
220
221        events: EntityEvents<DummyEntityEvent>,
222    }
223
224    impl EsEntity for DummyEntity {
225        type Event = DummyEntityEvent;
226        type New = NewDummyEntity;
227
228        fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
229            &mut self.events
230        }
231        fn events(&self) -> &EntityEvents<DummyEntityEvent> {
232            &self.events
233        }
234    }
235
236    impl TryFromEvents<DummyEntityEvent> for DummyEntity {
237        fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
238            let name = events
239                .iter_persisted()
240                .map(|e| match &e.event {
241                    DummyEntityEvent::Created(name) => name.clone(),
242                })
243                .next()
244                .expect("Could not find name");
245            Ok(Self { name, events })
246        }
247    }
248
249    struct NewDummyEntity {}
250
251    impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
252        fn into_events(self) -> EntityEvents<DummyEntityEvent> {
253            EntityEvents::init(
254                Uuid::new_v4(),
255                vec![DummyEntityEvent::Created("".to_owned())],
256            )
257        }
258    }
259
260    #[test]
261    fn load_zero_events() {
262        let generic_events = vec![];
263        let res = EntityEvents::load_first::<DummyEntity>(generic_events);
264        assert!(matches!(res, Err(EsEntityError::NotFound)));
265    }
266
267    #[test]
268    fn load_first() {
269        let generic_events = vec![GenericEvent {
270            entity_id: uuid::Uuid::new_v4(),
271            sequence: 1,
272            event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
273                .expect("Could not serialize"),
274            recorded_at: chrono::Utc::now(),
275        }];
276        let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
277        assert!(entity.name == "dummy-name");
278    }
279
280    #[test]
281    fn load_n() {
282        let generic_events = vec![
283            GenericEvent {
284                entity_id: uuid::Uuid::new_v4(),
285                sequence: 1,
286                event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
287                    .expect("Could not serialize"),
288                recorded_at: chrono::Utc::now(),
289            },
290            GenericEvent {
291                entity_id: uuid::Uuid::new_v4(),
292                sequence: 1,
293                event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
294                    .expect("Could not serialize"),
295                recorded_at: chrono::Utc::now(),
296            },
297        ];
298        let (entity, more): (Vec<DummyEntity>, _) =
299            EntityEvents::load_n(generic_events, 2).expect("Could not load");
300        assert!(!more);
301        assert_eq!(entity.len(), 2);
302    }
303}