1use chrono::{DateTime, Utc};
4
5use super::{error::EsEntityError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
9
10pub struct GenericEvent<Id> {
16 pub entity_id: Id,
17 pub sequence: i32,
18 pub event: serde_json::Value,
19 pub context: Option<crate::ContextData>,
20 pub recorded_at: DateTime<Utc>,
21}
22
23pub struct PersistedEvent<E: EsEvent> {
29 pub entity_id: <E as EsEvent>::EntityId,
31 pub recorded_at: DateTime<Utc>,
33 pub sequence: usize,
35 pub event: E,
37 pub context: Option<crate::ContextData>,
40}
41
42impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
43 fn clone(&self) -> Self {
44 PersistedEvent {
45 entity_id: self.entity_id.clone(),
46 recorded_at: self.recorded_at,
47 sequence: self.sequence,
48 event: self.event.clone(),
49 context: self.context.clone(),
50 }
51 }
52}
53
54pub struct EventWithContext<E: EsEvent> {
55 pub event: E,
56 pub context: Option<crate::ContextData>,
57}
58
59impl<E: Clone + EsEvent> Clone for EventWithContext<E> {
60 fn clone(&self) -> Self {
61 EventWithContext {
62 event: self.event.clone(),
63 context: self.context.clone(),
64 }
65 }
66}
67
68pub struct EntityEvents<T: EsEvent> {
73 pub entity_id: <T as EsEvent>::EntityId,
75 persisted_events: Vec<PersistedEvent<T>>,
77 new_events: Vec<EventWithContext<T>>,
79}
80
81impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
82 fn clone(&self) -> Self {
83 Self {
84 entity_id: self.entity_id.clone(),
85 persisted_events: self.persisted_events.clone(),
86 new_events: self.new_events.clone(),
87 }
88 }
89}
90
91impl<T> EntityEvents<T>
92where
93 T: EsEvent,
94{
95 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
97 let context = if <T as EsEvent>::event_context() {
98 Some(crate::EventContext::data_for_storing())
99 } else {
100 None
101 };
102 let new_events = initial_events
103 .into_iter()
104 .map(|event| EventWithContext {
105 event,
106 context: context.clone(),
107 })
108 .collect();
109 Self {
110 entity_id: id,
111 persisted_events: Vec::new(),
112 new_events,
113 }
114 }
115
116 pub fn id(&self) -> &<T as EsEvent>::EntityId {
118 &self.entity_id
119 }
120
121 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
123 self.persisted_events.first().map(|e| e.recorded_at)
124 }
125
126 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
128 self.persisted_events.last().map(|e| e.recorded_at)
129 }
130
131 pub fn push(&mut self, event: T) {
133 let context = if <T as EsEvent>::event_context() {
134 Some(crate::EventContext::data_for_storing())
135 } else {
136 None
137 };
138 self.new_events.push(EventWithContext { event, context });
139 }
140
141 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
143 let context = if <T as EsEvent>::event_context() {
144 Some(crate::EventContext::data_for_storing())
145 } else {
146 None
147 };
148 self.new_events
149 .extend(events.into_iter().map(|event| EventWithContext {
150 event,
151 context: context.clone(),
152 }));
153 }
154
155 pub fn any_new(&self) -> bool {
157 !self.new_events.is_empty()
158 }
159
160 pub fn len_persisted(&self) -> usize {
162 self.persisted_events.len()
163 }
164
165 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
167 self.persisted_events.iter()
168 }
169
170 pub fn last_persisted(&self, n: usize) -> LastPersisted<'_, T> {
172 let start = self.persisted_events.len() - n;
173 self.persisted_events[start..].iter()
174 }
175
176 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
178 self.persisted_events
179 .iter()
180 .map(|e| &e.event)
181 .chain(self.new_events.iter().map(|e| &e.event))
182 }
183
184 pub fn load_first<E: EsEntity<Event = T>>(
186 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
187 ) -> Result<E, EsEntityError> {
188 let mut current_id = None;
189 let mut current = None;
190 for e in events {
191 if current_id.is_none() {
192 current_id = Some(e.entity_id.clone());
193 current = Some(Self {
194 entity_id: e.entity_id.clone(),
195 persisted_events: Vec::new(),
196 new_events: Vec::new(),
197 });
198 }
199 if current_id.as_ref() != Some(&e.entity_id) {
200 break;
201 }
202 let cur = current.as_mut().expect("Could not get current");
203 cur.persisted_events.push(PersistedEvent {
204 entity_id: e.entity_id,
205 recorded_at: e.recorded_at,
206 sequence: e.sequence as usize,
207 event: serde_json::from_value(e.event)?,
208 context: e.context,
209 });
210 }
211 if let Some(current) = current {
212 E::try_from_events(current)
213 } else {
214 Err(EsEntityError::NotFound)
215 }
216 }
217
218 pub fn load_n<E: EsEntity<Event = T>>(
223 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
224 n: usize,
225 ) -> Result<(Vec<E>, bool), EsEntityError> {
226 let mut ret: Vec<E> = Vec::new();
227 let mut current_id = None;
228 let mut current = None;
229 for e in events {
230 if current_id.as_ref() != Some(&e.entity_id) {
231 if let Some(current) = current.take() {
232 ret.push(E::try_from_events(current)?);
233 if ret.len() == n {
234 return Ok((ret, true));
235 }
236 }
237
238 current_id = Some(e.entity_id.clone());
239 current = Some(Self {
240 entity_id: e.entity_id.clone(),
241 persisted_events: Vec::new(),
242 new_events: Vec::new(),
243 });
244 }
245 let cur = current.as_mut().expect("Could not get current");
246 cur.persisted_events.push(PersistedEvent {
247 entity_id: e.entity_id,
248 recorded_at: e.recorded_at,
249 sequence: e.sequence as usize,
250 event: serde_json::from_value(e.event)?,
251 context: e.context,
252 });
253 }
254 if let Some(current) = current.take() {
255 ret.push(E::try_from_events(current)?);
256 }
257 Ok((ret, false))
258 }
259
260 #[doc(hidden)]
261 pub fn mark_new_events_persisted_at(
262 &mut self,
263 recorded_at: chrono::DateTime<chrono::Utc>,
264 ) -> usize {
265 let n = self.new_events.len();
266 let offset = self.persisted_events.len() + 1;
267 self.persisted_events
268 .extend(
269 self.new_events
270 .drain(..)
271 .enumerate()
272 .map(|(i, event)| PersistedEvent {
273 entity_id: self.entity_id.clone(),
274 recorded_at,
275 sequence: i + offset,
276 event: event.event,
277 context: event.context,
278 }),
279 );
280 n
281 }
282
283 #[doc(hidden)]
284 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
285 self.new_events
286 .iter()
287 .map(|event| serde_json::to_value(&event.event).expect("Failed to serialize event"))
288 .collect()
289 }
290
291 #[doc(hidden)]
292 pub fn serialize_new_event_contexts(&self) -> Option<Vec<crate::ContextData>> {
293 if <T as EsEvent>::event_context() {
294 let contexts = self
295 .new_events
296 .iter()
297 .map(|event| event.context.clone().expect("Missing context"))
298 .collect();
299
300 Some(contexts)
301 } else {
302 None
303 }
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use uuid::Uuid;
311
312 #[derive(Debug, serde::Serialize, serde::Deserialize)]
313 enum DummyEntityEvent {
314 Created(String),
315 }
316
317 impl EsEvent for DummyEntityEvent {
318 type EntityId = Uuid;
319 fn event_context() -> bool {
320 true
321 }
322 }
323
324 struct DummyEntity {
325 name: String,
326
327 events: EntityEvents<DummyEntityEvent>,
328 }
329
330 impl EsEntity for DummyEntity {
331 type Event = DummyEntityEvent;
332 type New = NewDummyEntity;
333
334 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
335 &mut self.events
336 }
337 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
338 &self.events
339 }
340 }
341
342 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
343 fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
344 let name = events
345 .iter_persisted()
346 .map(|e| match &e.event {
347 DummyEntityEvent::Created(name) => name.clone(),
348 })
349 .next()
350 .expect("Could not find name");
351 Ok(Self { name, events })
352 }
353 }
354
355 struct NewDummyEntity {}
356
357 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
358 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
359 EntityEvents::init(
360 Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
361 vec![DummyEntityEvent::Created("".to_owned())],
362 )
363 }
364 }
365
366 #[test]
367 fn load_zero_events() {
368 let generic_events = vec![];
369 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
370 assert!(matches!(res, Err(EsEntityError::NotFound)));
371 }
372
373 #[test]
374 fn load_first() {
375 let generic_events = vec![GenericEvent {
376 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
377 sequence: 1,
378 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
379 .expect("Could not serialize"),
380 context: None,
381 recorded_at: chrono::Utc::now(),
382 }];
383 let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
384 assert!(entity.name == "dummy-name");
385 }
386
387 #[test]
388 fn load_n() {
389 let generic_events = vec![
390 GenericEvent {
391 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
392 sequence: 1,
393 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
394 .expect("Could not serialize"),
395 context: None,
396 recorded_at: chrono::Utc::now(),
397 },
398 GenericEvent {
399 entity_id: Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap(),
400 sequence: 1,
401 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
402 .expect("Could not serialize"),
403 context: None,
404 recorded_at: chrono::Utc::now(),
405 },
406 ];
407 let (entity, more): (Vec<DummyEntity>, _) =
408 EntityEvents::load_n(generic_events, 2).expect("Could not load");
409 assert!(!more);
410 assert_eq!(entity.len(), 2);
411 }
412}