es_entity/
events.rs

1use chrono::{DateTime, Utc};
2
3use super::{error::EsEntityError, traits::*};
4
5pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
6
7pub struct GenericEvent<Id> {
8    pub entity_id: Id,
9    pub sequence: i32,
10    pub event: serde_json::Value,
11    pub recorded_at: DateTime<Utc>,
12}
13
14pub struct PersistedEvent<E: EsEvent> {
15    pub entity_id: <E as EsEvent>::EntityId,
16    pub recorded_at: DateTime<Utc>,
17    pub sequence: usize,
18    pub event: E,
19}
20
21impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
22    fn clone(&self) -> Self {
23        PersistedEvent {
24            entity_id: self.entity_id.clone(),
25            recorded_at: self.recorded_at,
26            sequence: self.sequence,
27            event: self.event.clone(),
28        }
29    }
30}
31
32pub struct EntityEvents<T: EsEvent> {
33    pub entity_id: <T as EsEvent>::EntityId,
34    persisted_events: Vec<PersistedEvent<T>>,
35    new_events: Vec<T>,
36}
37
38impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
39    fn clone(&self) -> Self {
40        Self {
41            entity_id: self.entity_id.clone(),
42            persisted_events: self.persisted_events.clone(),
43            new_events: self.new_events.clone(),
44        }
45    }
46}
47
48impl<T> EntityEvents<T>
49where
50    T: EsEvent,
51{
52    pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
53        Self {
54            entity_id: id,
55            persisted_events: Vec::new(),
56            new_events: initial_events.into_iter().collect(),
57        }
58    }
59
60    pub fn id(&self) -> &<T as EsEvent>::EntityId {
61        &self.entity_id
62    }
63
64    pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
65        self.persisted_events.first().map(|e| e.recorded_at)
66    }
67
68    pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
69        self.persisted_events.last().map(|e| e.recorded_at)
70    }
71
72    pub fn push(&mut self, event: T) {
73        self.new_events.push(event);
74    }
75
76    pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
77        self.new_events.extend(events);
78    }
79
80    pub fn mark_new_events_persisted_at(
81        &mut self,
82        recorded_at: chrono::DateTime<chrono::Utc>,
83    ) -> usize {
84        let n = self.new_events.len();
85        let offset = self.persisted_events.len() + 1;
86        self.persisted_events
87            .extend(
88                self.new_events
89                    .drain(..)
90                    .enumerate()
91                    .map(|(i, event)| PersistedEvent {
92                        entity_id: self.entity_id.clone(),
93                        recorded_at,
94                        sequence: i + offset,
95                        event,
96                    }),
97            );
98        n
99    }
100
101    pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
102        self.new_events
103            .iter()
104            .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
105            .collect()
106    }
107
108    pub fn any_new(&self) -> bool {
109        !self.new_events.is_empty()
110    }
111
112    pub fn len_persisted(&self) -> usize {
113        self.persisted_events.len()
114    }
115
116    pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
117        self.persisted_events.iter()
118    }
119
120    pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
121        let start = self.persisted_events.len() - n;
122        self.persisted_events[start..].iter()
123    }
124
125    pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
126        self.persisted_events
127            .iter()
128            .map(|e| &e.event)
129            .chain(self.new_events.iter())
130    }
131
132    pub fn load_first<E: EsEntity<Event = T>>(
133        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
134    ) -> Result<E, EsEntityError> {
135        let mut current_id = None;
136        let mut current = None;
137        for e in events {
138            if current_id.is_none() {
139                current_id = Some(e.entity_id.clone());
140                current = Some(Self {
141                    entity_id: e.entity_id.clone(),
142                    persisted_events: Vec::new(),
143                    new_events: Vec::new(),
144                });
145            }
146            if current_id.as_ref() != Some(&e.entity_id) {
147                break;
148            }
149            let cur = current.as_mut().expect("Could not get current");
150            cur.persisted_events.push(PersistedEvent {
151                entity_id: e.entity_id,
152                recorded_at: e.recorded_at,
153                sequence: e.sequence as usize,
154                event: serde_json::from_value(e.event)?,
155            });
156        }
157        if let Some(current) = current {
158            E::try_from_events(current)
159        } else {
160            Err(EsEntityError::NotFound)
161        }
162    }
163
164    pub fn load_n<E: EsEntity<Event = T>>(
165        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
166        n: usize,
167    ) -> Result<(Vec<E>, bool), EsEntityError> {
168        let mut ret: Vec<E> = Vec::new();
169        let mut current_id = None;
170        let mut current = None;
171        for e in events {
172            if current_id.as_ref() != Some(&e.entity_id) {
173                if let Some(current) = current.take() {
174                    ret.push(E::try_from_events(current)?);
175                    if ret.len() == n {
176                        return Ok((ret, true));
177                    }
178                }
179
180                current_id = Some(e.entity_id.clone());
181                current = Some(Self {
182                    entity_id: e.entity_id.clone(),
183                    persisted_events: Vec::new(),
184                    new_events: Vec::new(),
185                });
186            }
187            let cur = current.as_mut().expect("Could not get current");
188            cur.persisted_events.push(PersistedEvent {
189                entity_id: e.entity_id,
190                recorded_at: e.recorded_at,
191                sequence: e.sequence as usize,
192                event: serde_json::from_value(e.event)?,
193            });
194        }
195        if let Some(current) = current.take() {
196            ret.push(E::try_from_events(current)?);
197        }
198        Ok((ret, false))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use uuid::Uuid;
206
207    #[derive(Debug, serde::Serialize, serde::Deserialize)]
208    enum DummyEntityEvent {
209        Created(String),
210    }
211
212    impl EsEvent for DummyEntityEvent {
213        type EntityId = Uuid;
214    }
215
216    struct DummyEntity {
217        name: String,
218
219        events: EntityEvents<DummyEntityEvent>,
220    }
221
222    impl EsEntity for DummyEntity {
223        type Event = DummyEntityEvent;
224        type New = NewDummyEntity;
225
226        fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
227            &mut self.events
228        }
229        fn events(&self) -> &EntityEvents<DummyEntityEvent> {
230            &self.events
231        }
232    }
233
234    impl TryFromEvents<DummyEntityEvent> for DummyEntity {
235        fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
236            let name = events
237                .iter_persisted()
238                .map(|e| match &e.event {
239                    DummyEntityEvent::Created(name) => name.clone(),
240                })
241                .next()
242                .expect("Could not find name");
243            Ok(Self { name, events })
244        }
245    }
246
247    struct NewDummyEntity {}
248
249    impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
250        fn into_events(self) -> EntityEvents<DummyEntityEvent> {
251            EntityEvents::init(
252                Uuid::new_v4(),
253                vec![DummyEntityEvent::Created("".to_owned())],
254            )
255        }
256    }
257
258    #[test]
259    fn load_zero_events() {
260        let generic_events = vec![];
261        let res = EntityEvents::load_first::<DummyEntity>(generic_events);
262        assert!(matches!(res, Err(EsEntityError::NotFound)));
263    }
264
265    #[test]
266    fn load_first() {
267        let generic_events = vec![GenericEvent {
268            entity_id: uuid::Uuid::new_v4(),
269            sequence: 1,
270            event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
271                .expect("Could not serialize"),
272            recorded_at: chrono::Utc::now(),
273        }];
274        let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
275        assert!(entity.name == "dummy-name");
276    }
277
278    #[test]
279    fn load_n() {
280        let generic_events = vec![
281            GenericEvent {
282                entity_id: uuid::Uuid::new_v4(),
283                sequence: 1,
284                event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
285                    .expect("Could not serialize"),
286                recorded_at: chrono::Utc::now(),
287            },
288            GenericEvent {
289                entity_id: uuid::Uuid::new_v4(),
290                sequence: 1,
291                event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
292                    .expect("Could not serialize"),
293                recorded_at: chrono::Utc::now(),
294            },
295        ];
296        let (entity, more): (Vec<DummyEntity>, _) =
297            EntityEvents::load_n(generic_events, 2).expect("Could not load");
298        assert!(!more);
299        assert_eq!(entity.len(), 2);
300    }
301}