es_entity/
events.rs

1use chrono::{DateTime, Utc};
2
3use super::{error::EsEntityError, traits::*};
4
5pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
6
7pub struct GenericEvent<Id> {
8    pub entity_id: Id,
9    pub sequence: i32,
10    pub event: serde_json::Value,
11    pub recorded_at: DateTime<Utc>,
12}
13
14pub struct PersistedEvent<E: EsEvent> {
15    pub entity_id: <E as EsEvent>::EntityId,
16    pub recorded_at: DateTime<Utc>,
17    pub sequence: usize,
18    pub event: E,
19}
20
21impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
22    fn clone(&self) -> Self {
23        PersistedEvent {
24            entity_id: self.entity_id.clone(),
25            recorded_at: self.recorded_at,
26            sequence: self.sequence,
27            event: self.event.clone(),
28        }
29    }
30}
31
32pub struct EntityEvents<T: EsEvent> {
33    pub entity_id: <T as EsEvent>::EntityId,
34    persisted_events: Vec<PersistedEvent<T>>,
35    new_events: Vec<T>,
36}
37
38impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
39    fn clone(&self) -> Self {
40        Self {
41            entity_id: self.entity_id.clone(),
42            persisted_events: self.persisted_events.clone(),
43            new_events: self.new_events.clone(),
44        }
45    }
46}
47
48impl<T> EntityEvents<T>
49where
50    T: EsEvent,
51{
52    pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
53        Self {
54            entity_id: id,
55            persisted_events: Vec::new(),
56            new_events: initial_events.into_iter().collect(),
57        }
58    }
59
60    pub fn id(&self) -> &<T as EsEvent>::EntityId {
61        &self.entity_id
62    }
63
64    pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
65        self.persisted_events.first().map(|e| e.recorded_at)
66    }
67
68    pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
69        self.persisted_events.last().map(|e| e.recorded_at)
70    }
71
72    pub fn push(&mut self, event: T) {
73        self.new_events.push(event);
74    }
75
76    pub fn mark_new_events_persisted_at(
77        &mut self,
78        recorded_at: chrono::DateTime<chrono::Utc>,
79    ) -> usize {
80        let n = self.new_events.len();
81        let offset = self.persisted_events.len() + 1;
82        self.persisted_events
83            .extend(
84                self.new_events
85                    .drain(..)
86                    .enumerate()
87                    .map(|(i, event)| PersistedEvent {
88                        entity_id: self.entity_id.clone(),
89                        recorded_at,
90                        sequence: i + offset,
91                        event,
92                    }),
93            );
94        n
95    }
96
97    pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
98        self.new_events
99            .iter()
100            .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
101            .collect()
102    }
103
104    pub fn any_new(&self) -> bool {
105        !self.new_events.is_empty()
106    }
107
108    pub fn len_persisted(&self) -> usize {
109        self.persisted_events.len()
110    }
111
112    pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
113        self.persisted_events.iter()
114    }
115
116    pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
117        let start = self.persisted_events.len() - n;
118        self.persisted_events[start..].iter()
119    }
120
121    pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
122        self.persisted_events
123            .iter()
124            .map(|e| &e.event)
125            .chain(self.new_events.iter())
126    }
127
128    pub fn load_first<E: EsEntity<Event = T>>(
129        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
130    ) -> Result<E, EsEntityError> {
131        let mut current_id = None;
132        let mut current = None;
133        for e in events {
134            if current_id.is_none() {
135                current_id = Some(e.entity_id.clone());
136                current = Some(Self {
137                    entity_id: e.entity_id.clone(),
138                    persisted_events: Vec::new(),
139                    new_events: Vec::new(),
140                });
141            }
142            if current_id.as_ref() != Some(&e.entity_id) {
143                break;
144            }
145            let cur = current.as_mut().expect("Could not get current");
146            cur.persisted_events.push(PersistedEvent {
147                entity_id: e.entity_id,
148                recorded_at: e.recorded_at,
149                sequence: e.sequence as usize,
150                event: serde_json::from_value(e.event)?,
151            });
152        }
153        if let Some(current) = current {
154            E::try_from_events(current)
155        } else {
156            Err(EsEntityError::NotFound)
157        }
158    }
159
160    pub fn load_n<E: EsEntity<Event = T>>(
161        events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
162        n: usize,
163    ) -> Result<(Vec<E>, bool), EsEntityError> {
164        let mut ret: Vec<E> = Vec::new();
165        let mut current_id = None;
166        let mut current = None;
167        for e in events {
168            if current_id.as_ref() != Some(&e.entity_id) {
169                if let Some(current) = current.take() {
170                    ret.push(E::try_from_events(current)?);
171                    if ret.len() == n {
172                        return Ok((ret, true));
173                    }
174                }
175
176                current_id = Some(e.entity_id.clone());
177                current = Some(Self {
178                    entity_id: e.entity_id.clone(),
179                    persisted_events: Vec::new(),
180                    new_events: Vec::new(),
181                });
182            }
183            let cur = current.as_mut().expect("Could not get current");
184            cur.persisted_events.push(PersistedEvent {
185                entity_id: e.entity_id,
186                recorded_at: e.recorded_at,
187                sequence: e.sequence as usize,
188                event: serde_json::from_value(e.event)?,
189            });
190        }
191        if let Some(current) = current.take() {
192            ret.push(E::try_from_events(current)?);
193        }
194        Ok((ret, false))
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use uuid::Uuid;
202
203    #[derive(Debug, serde::Serialize, serde::Deserialize)]
204    enum DummyEntityEvent {
205        Created(String),
206    }
207
208    impl EsEvent for DummyEntityEvent {
209        type EntityId = Uuid;
210    }
211
212    struct DummyEntity {
213        name: String,
214
215        events: EntityEvents<DummyEntityEvent>,
216    }
217
218    impl EsEntity for DummyEntity {
219        type Event = DummyEntityEvent;
220        type New = NewDummyEntity;
221
222        fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
223            &mut self.events
224        }
225        fn events(&self) -> &EntityEvents<DummyEntityEvent> {
226            &self.events
227        }
228    }
229
230    impl TryFromEvents<DummyEntityEvent> for DummyEntity {
231        fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
232            let name = events
233                .iter_persisted()
234                .map(|e| match &e.event {
235                    DummyEntityEvent::Created(name) => name.clone(),
236                })
237                .next()
238                .expect("Could not find name");
239            Ok(Self { name, events })
240        }
241    }
242
243    struct NewDummyEntity {}
244
245    impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
246        fn into_events(self) -> EntityEvents<DummyEntityEvent> {
247            EntityEvents::init(
248                Uuid::new_v4(),
249                vec![DummyEntityEvent::Created("".to_owned())],
250            )
251        }
252    }
253
254    #[test]
255    fn load_zero_events() {
256        let generic_events = vec![];
257        let res = EntityEvents::load_first::<DummyEntity>(generic_events);
258        assert!(matches!(res, Err(EsEntityError::NotFound)));
259    }
260
261    #[test]
262    fn load_first() {
263        let generic_events = vec![GenericEvent {
264            entity_id: uuid::Uuid::new_v4(),
265            sequence: 1,
266            event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
267                .expect("Could not serialize"),
268            recorded_at: chrono::Utc::now(),
269        }];
270        let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
271        assert!(entity.name == "dummy-name");
272    }
273
274    #[test]
275    fn load_n() {
276        let generic_events = vec![
277            GenericEvent {
278                entity_id: uuid::Uuid::new_v4(),
279                sequence: 1,
280                event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
281                    .expect("Could not serialize"),
282                recorded_at: chrono::Utc::now(),
283            },
284            GenericEvent {
285                entity_id: uuid::Uuid::new_v4(),
286                sequence: 1,
287                event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
288                    .expect("Could not serialize"),
289                recorded_at: chrono::Utc::now(),
290            },
291        ];
292        let (entity, more): (Vec<DummyEntity>, _) =
293            EntityEvents::load_n(generic_events, 2).expect("Could not load");
294        assert!(!more);
295        assert_eq!(entity.len(), 2);
296    }
297}