1use chrono::{DateTime, Utc};
4
5use super::{error::EsEntityError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10pub struct GenericEvent<Id> {
16 pub entity_id: Id,
17 pub sequence: i32,
18 pub event: serde_json::Value,
19 pub recorded_at: DateTime<Utc>,
20}
21
22pub struct PersistedEvent<E: EsEvent> {
28 pub entity_id: <E as EsEvent>::EntityId,
30 pub recorded_at: DateTime<Utc>,
32 pub sequence: usize,
34 pub event: E,
36}
37
38impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
39 fn clone(&self) -> Self {
40 PersistedEvent {
41 entity_id: self.entity_id.clone(),
42 recorded_at: self.recorded_at,
43 sequence: self.sequence,
44 event: self.event.clone(),
45 }
46 }
47}
48
49pub struct EntityEvents<T: EsEvent> {
54 pub entity_id: <T as EsEvent>::EntityId,
56 persisted_events: Vec<PersistedEvent<T>>,
58 new_events: Vec<T>,
60}
61
62impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
63 fn clone(&self) -> Self {
64 Self {
65 entity_id: self.entity_id.clone(),
66 persisted_events: self.persisted_events.clone(),
67 new_events: self.new_events.clone(),
68 }
69 }
70}
71
72impl<T> EntityEvents<T>
73where
74 T: EsEvent,
75{
76 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
78 Self {
79 entity_id: id,
80 persisted_events: Vec::new(),
81 new_events: initial_events.into_iter().collect(),
82 }
83 }
84
85 pub fn id(&self) -> &<T as EsEvent>::EntityId {
87 &self.entity_id
88 }
89
90 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
92 self.persisted_events.first().map(|e| e.recorded_at)
93 }
94
95 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
97 self.persisted_events.last().map(|e| e.recorded_at)
98 }
99
100 pub fn push(&mut self, event: T) {
102 self.new_events.push(event);
103 }
104
105 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
107 self.new_events.extend(events);
108 }
109
110 pub fn any_new(&self) -> bool {
112 !self.new_events.is_empty()
113 }
114
115 pub fn len_persisted(&self) -> usize {
117 self.persisted_events.len()
118 }
119
120 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
122 self.persisted_events.iter()
123 }
124
125 pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
127 let start = self.persisted_events.len() - n;
128 self.persisted_events[start..].iter()
129 }
130
131 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
133 self.persisted_events
134 .iter()
135 .map(|e| &e.event)
136 .chain(self.new_events.iter())
137 }
138
139 pub fn load_first<E: EsEntity<Event = T>>(
141 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
142 ) -> Result<E, EsEntityError> {
143 let mut current_id = None;
144 let mut current = None;
145 for e in events {
146 if current_id.is_none() {
147 current_id = Some(e.entity_id.clone());
148 current = Some(Self {
149 entity_id: e.entity_id.clone(),
150 persisted_events: Vec::new(),
151 new_events: Vec::new(),
152 });
153 }
154 if current_id.as_ref() != Some(&e.entity_id) {
155 break;
156 }
157 let cur = current.as_mut().expect("Could not get current");
158 cur.persisted_events.push(PersistedEvent {
159 entity_id: e.entity_id,
160 recorded_at: e.recorded_at,
161 sequence: e.sequence as usize,
162 event: serde_json::from_value(e.event)?,
163 });
164 }
165 if let Some(current) = current {
166 E::try_from_events(current)
167 } else {
168 Err(EsEntityError::NotFound)
169 }
170 }
171
172 pub fn load_n<E: EsEntity<Event = T>>(
177 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
178 n: usize,
179 ) -> Result<(Vec<E>, bool), EsEntityError> {
180 let mut ret: Vec<E> = Vec::new();
181 let mut current_id = None;
182 let mut current = None;
183 for e in events {
184 if current_id.as_ref() != Some(&e.entity_id) {
185 if let Some(current) = current.take() {
186 ret.push(E::try_from_events(current)?);
187 if ret.len() == n {
188 return Ok((ret, true));
189 }
190 }
191
192 current_id = Some(e.entity_id.clone());
193 current = Some(Self {
194 entity_id: e.entity_id.clone(),
195 persisted_events: Vec::new(),
196 new_events: Vec::new(),
197 });
198 }
199 let cur = current.as_mut().expect("Could not get current");
200 cur.persisted_events.push(PersistedEvent {
201 entity_id: e.entity_id,
202 recorded_at: e.recorded_at,
203 sequence: e.sequence as usize,
204 event: serde_json::from_value(e.event)?,
205 });
206 }
207 if let Some(current) = current.take() {
208 ret.push(E::try_from_events(current)?);
209 }
210 Ok((ret, false))
211 }
212
213 #[doc(hidden)]
214 pub fn mark_new_events_persisted_at(
215 &mut self,
216 recorded_at: chrono::DateTime<chrono::Utc>,
217 ) -> usize {
218 let n = self.new_events.len();
219 let offset = self.persisted_events.len() + 1;
220 self.persisted_events
221 .extend(
222 self.new_events
223 .drain(..)
224 .enumerate()
225 .map(|(i, event)| PersistedEvent {
226 entity_id: self.entity_id.clone(),
227 recorded_at,
228 sequence: i + offset,
229 event,
230 }),
231 );
232 n
233 }
234
235 #[doc(hidden)]
236 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
237 self.new_events
238 .iter()
239 .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
240 .collect()
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use uuid::Uuid;
248
249 #[derive(Debug, serde::Serialize, serde::Deserialize)]
250 enum DummyEntityEvent {
251 Created(String),
252 }
253
254 impl EsEvent for DummyEntityEvent {
255 type EntityId = Uuid;
256 }
257
258 struct DummyEntity {
259 name: String,
260
261 events: EntityEvents<DummyEntityEvent>,
262 }
263
264 impl EsEntity for DummyEntity {
265 type Event = DummyEntityEvent;
266 type New = NewDummyEntity;
267
268 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
269 &mut self.events
270 }
271 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
272 &self.events
273 }
274 }
275
276 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
277 fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
278 let name = events
279 .iter_persisted()
280 .map(|e| match &e.event {
281 DummyEntityEvent::Created(name) => name.clone(),
282 })
283 .next()
284 .expect("Could not find name");
285 Ok(Self { name, events })
286 }
287 }
288
289 struct NewDummyEntity {}
290
291 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
292 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
293 EntityEvents::init(
294 Uuid::now_v7(),
295 vec![DummyEntityEvent::Created("".to_owned())],
296 )
297 }
298 }
299
300 #[test]
301 fn load_zero_events() {
302 let generic_events = vec![];
303 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
304 assert!(matches!(res, Err(EsEntityError::NotFound)));
305 }
306
307 #[test]
308 fn load_first() {
309 let generic_events = vec![GenericEvent {
310 entity_id: uuid::Uuid::now_v7(),
311 sequence: 1,
312 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
313 .expect("Could not serialize"),
314 recorded_at: chrono::Utc::now(),
315 }];
316 let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
317 assert!(entity.name == "dummy-name");
318 }
319
320 #[test]
321 fn load_n() {
322 let generic_events = vec![
323 GenericEvent {
324 entity_id: uuid::Uuid::now_v7(),
325 sequence: 1,
326 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
327 .expect("Could not serialize"),
328 recorded_at: chrono::Utc::now(),
329 },
330 GenericEvent {
331 entity_id: uuid::Uuid::now_v7(),
332 sequence: 1,
333 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
334 .expect("Could not serialize"),
335 recorded_at: chrono::Utc::now(),
336 },
337 ];
338 let (entity, more): (Vec<DummyEntity>, _) =
339 EntityEvents::load_n(generic_events, 2).expect("Could not load");
340 assert!(!more);
341 assert_eq!(entity.len(), 2);
342 }
343}