1use chrono::{DateTime, Utc};
4
5use super::{error::EntityHydrationError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10pub struct GenericEvent<Id> {
16 pub entity_id: Id,
17 pub sequence: i32,
18 pub event: serde_json::Value,
19 pub context: Option<crate::ContextData>,
20 pub recorded_at: DateTime<Utc>,
21}
22
23pub struct PersistedEvent<E: EsEvent> {
29 pub entity_id: <E as EsEvent>::EntityId,
31 pub recorded_at: DateTime<Utc>,
33 pub sequence: usize,
35 pub event: E,
37 pub context: Option<crate::ContextData>,
40}
41
42impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
43 fn clone(&self) -> Self {
44 PersistedEvent {
45 entity_id: self.entity_id.clone(),
46 recorded_at: self.recorded_at,
47 sequence: self.sequence,
48 event: self.event.clone(),
49 context: self.context.clone(),
50 }
51 }
52}
53
54pub struct EventWithContext<E: EsEvent> {
55 pub event: E,
56 pub context: Option<crate::ContextData>,
57}
58
59impl<E: Clone + EsEvent> Clone for EventWithContext<E> {
60 fn clone(&self) -> Self {
61 EventWithContext {
62 event: self.event.clone(),
63 context: self.context.clone(),
64 }
65 }
66}
67
68pub struct EntityEvents<T: EsEvent> {
73 pub entity_id: <T as EsEvent>::EntityId,
75 persisted_events: Vec<PersistedEvent<T>>,
77 new_events: Vec<EventWithContext<T>>,
79}
80
81impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
82 fn clone(&self) -> Self {
83 Self {
84 entity_id: self.entity_id.clone(),
85 persisted_events: self.persisted_events.clone(),
86 new_events: self.new_events.clone(),
87 }
88 }
89}
90
91impl<T> EntityEvents<T>
92where
93 T: EsEvent,
94{
95 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
97 let context = if <T as EsEvent>::event_context() {
98 Some(crate::EventContext::data_for_storing())
99 } else {
100 None
101 };
102 let new_events = initial_events
103 .into_iter()
104 .map(|event| EventWithContext {
105 event,
106 context: context.clone(),
107 })
108 .collect();
109 Self {
110 entity_id: id,
111 persisted_events: Vec::new(),
112 new_events,
113 }
114 }
115
116 pub fn id(&self) -> &<T as EsEvent>::EntityId {
118 &self.entity_id
119 }
120
121 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
123 self.persisted_events.first().map(|e| e.recorded_at)
124 }
125
126 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
128 self.persisted_events.last().map(|e| e.recorded_at)
129 }
130
131 pub fn push(&mut self, event: T) {
133 let context = if <T as EsEvent>::event_context() {
134 Some(crate::EventContext::data_for_storing())
135 } else {
136 None
137 };
138 self.new_events.push(EventWithContext { event, context });
139 }
140
141 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
143 let context = if <T as EsEvent>::event_context() {
144 Some(crate::EventContext::data_for_storing())
145 } else {
146 None
147 };
148 self.new_events
149 .extend(events.into_iter().map(|event| EventWithContext {
150 event,
151 context: context.clone(),
152 }));
153 }
154
155 pub fn any_new(&self) -> bool {
157 !self.new_events.is_empty()
158 }
159
160 pub fn len_persisted(&self) -> usize {
162 self.persisted_events.len()
163 }
164
165 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
167 self.persisted_events.iter()
168 }
169
170 pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
172 let start = self.persisted_events.len() - n;
173 self.persisted_events[start..].iter()
174 }
175
176 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
178 self.persisted_events
179 .iter()
180 .map(|e| &e.event)
181 .chain(self.new_events.iter().map(|e| &e.event))
182 }
183
184 pub fn load_first<E: EsEntity<Event = T>>(
188 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
189 ) -> Result<Option<E>, EntityHydrationError> {
190 let mut current_id = None;
191 let mut current = None;
192 for e in events {
193 if current_id.is_none() {
194 current_id = Some(e.entity_id.clone());
195 current = Some(Self {
196 entity_id: e.entity_id.clone(),
197 persisted_events: Vec::new(),
198 new_events: Vec::new(),
199 });
200 }
201 if current_id.as_ref() != Some(&e.entity_id) {
202 break;
203 }
204 let cur = current.as_mut().expect("Could not get current");
205 cur.persisted_events.push(PersistedEvent {
206 entity_id: e.entity_id,
207 recorded_at: e.recorded_at,
208 sequence: e.sequence as usize,
209 event: serde_json::from_value(e.event)?,
210 context: e.context,
211 });
212 }
213 if let Some(current) = current {
214 Ok(Some(E::try_from_events(current)?))
215 } else {
216 Ok(None)
217 }
218 }
219
220 pub fn load_n<E: EsEntity<Event = T>>(
225 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
226 n: usize,
227 ) -> Result<(Vec<E>, bool), EntityHydrationError> {
228 let mut ret: Vec<E> = Vec::new();
229 let mut current_id = None;
230 let mut current = None;
231 for e in events {
232 if current_id.as_ref() != Some(&e.entity_id) {
233 if let Some(current) = current.take() {
234 ret.push(E::try_from_events(current)?);
235 if ret.len() == n {
236 return Ok((ret, true));
237 }
238 }
239
240 current_id = Some(e.entity_id.clone());
241 current = Some(Self {
242 entity_id: e.entity_id.clone(),
243 persisted_events: Vec::new(),
244 new_events: Vec::new(),
245 });
246 }
247 let cur = current.as_mut().expect("Could not get current");
248 cur.persisted_events.push(PersistedEvent {
249 entity_id: e.entity_id,
250 recorded_at: e.recorded_at,
251 sequence: e.sequence as usize,
252 event: serde_json::from_value(e.event)?,
253 context: e.context,
254 });
255 }
256 if let Some(current) = current.take() {
257 ret.push(E::try_from_events(current)?);
258 }
259 Ok((ret, false))
260 }
261
262 #[doc(hidden)]
263 pub fn mark_new_events_persisted_at(
264 &mut self,
265 recorded_at: chrono::DateTime<chrono::Utc>,
266 ) -> usize {
267 let n = self.new_events.len();
268 let offset = self.persisted_events.len() + 1;
269 self.persisted_events
270 .extend(
271 self.new_events
272 .drain(..)
273 .enumerate()
274 .map(|(i, event)| PersistedEvent {
275 entity_id: self.entity_id.clone(),
276 recorded_at,
277 sequence: i + offset,
278 event: event.event,
279 context: event.context,
280 }),
281 );
282 n
283 }
284
285 #[doc(hidden)]
286 pub fn new_event_types(&self) -> Vec<String> {
287 self.new_events
288 .iter()
289 .map(|event| event.event.event_type().to_string())
290 .collect()
291 }
292
293 #[doc(hidden)]
294 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
295 self.new_events
296 .iter()
297 .map(|event| serde_json::to_value(&event.event).expect("Failed to serialize event"))
298 .collect()
299 }
300
301 #[doc(hidden)]
302 pub fn serialize_new_event_contexts(&self) -> Option<Vec<crate::ContextData>> {
303 if <T as EsEvent>::event_context() {
304 let contexts = self
305 .new_events
306 .iter()
307 .map(|event| event.context.clone().expect("Missing context"))
308 .collect();
309
310 Some(contexts)
311 } else {
312 None
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use uuid::Uuid;
321
322 #[derive(Debug, serde::Serialize, serde::Deserialize)]
323 enum DummyEntityEvent {
324 Created(String),
325 }
326
327 impl EsEvent for DummyEntityEvent {
328 type EntityId = Uuid;
329 fn event_context() -> bool {
330 true
331 }
332 fn event_type(&self) -> &'static str {
333 match self {
334 Self::Created(_) => "created",
335 }
336 }
337 }
338
339 struct DummyEntity {
340 name: String,
341
342 events: EntityEvents<DummyEntityEvent>,
343 }
344
345 impl EsEntity for DummyEntity {
346 type Event = DummyEntityEvent;
347 type New = NewDummyEntity;
348
349 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
350 &mut self.events
351 }
352 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
353 &self.events
354 }
355 }
356
357 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
358 fn try_from_events(
359 events: EntityEvents<DummyEntityEvent>,
360 ) -> Result<Self, EntityHydrationError> {
361 let name = events
362 .iter_persisted()
363 .map(|e| match &e.event {
364 DummyEntityEvent::Created(name) => name.clone(),
365 })
366 .next()
367 .expect("Could not find name");
368 Ok(Self { name, events })
369 }
370 }
371
372 struct NewDummyEntity {}
373
374 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
375 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
376 EntityEvents::init(
377 Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
378 vec![DummyEntityEvent::Created("".to_owned())],
379 )
380 }
381 }
382
383 #[test]
384 fn load_zero_events() {
385 let generic_events = vec![];
386 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
387 assert!(matches!(res, Ok(None)));
388 }
389
390 #[test]
391 fn load_first() {
392 let generic_events = vec![GenericEvent {
393 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
394 sequence: 1,
395 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
396 .expect("Could not serialize"),
397 context: None,
398 recorded_at: chrono::Utc::now(),
399 }];
400 let entity: DummyEntity = EntityEvents::load_first(generic_events)
401 .expect("Could not load")
402 .expect("No entity found");
403 assert!(entity.name == "dummy-name");
404 }
405
406 #[test]
407 fn load_n() {
408 let generic_events = vec![
409 GenericEvent {
410 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
411 sequence: 1,
412 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
413 .expect("Could not serialize"),
414 context: None,
415 recorded_at: chrono::Utc::now(),
416 },
417 GenericEvent {
418 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap(),
419 sequence: 1,
420 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
421 .expect("Could not serialize"),
422 context: None,
423 recorded_at: chrono::Utc::now(),
424 },
425 ];
426 let (entity, more): (Vec<DummyEntity>, _) =
427 EntityEvents::load_n(generic_events, 2).expect("Could not load");
428 assert!(!more);
429 assert_eq!(entity.len(), 2);
430 }
431}