1use chrono::{DateTime, Utc};
4
5use super::{error::EsEntityError, traits::*};
6
7pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
8
9pub struct GenericEvent<Id> {
10 pub entity_id: Id,
11 pub sequence: i32,
12 pub event: serde_json::Value,
13 pub recorded_at: DateTime<Utc>,
14}
15
16pub struct PersistedEvent<E: EsEvent> {
17 pub entity_id: <E as EsEvent>::EntityId,
18 pub recorded_at: DateTime<Utc>,
19 pub sequence: usize,
20 pub event: E,
21}
22
23impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
24 fn clone(&self) -> Self {
25 PersistedEvent {
26 entity_id: self.entity_id.clone(),
27 recorded_at: self.recorded_at,
28 sequence: self.sequence,
29 event: self.event.clone(),
30 }
31 }
32}
33
34pub struct EntityEvents<T: EsEvent> {
35 pub entity_id: <T as EsEvent>::EntityId,
36 persisted_events: Vec<PersistedEvent<T>>,
37 new_events: Vec<T>,
38}
39
40impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
41 fn clone(&self) -> Self {
42 Self {
43 entity_id: self.entity_id.clone(),
44 persisted_events: self.persisted_events.clone(),
45 new_events: self.new_events.clone(),
46 }
47 }
48}
49
50impl<T> EntityEvents<T>
51where
52 T: EsEvent,
53{
54 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
55 Self {
56 entity_id: id,
57 persisted_events: Vec::new(),
58 new_events: initial_events.into_iter().collect(),
59 }
60 }
61
62 pub fn id(&self) -> &<T as EsEvent>::EntityId {
63 &self.entity_id
64 }
65
66 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
67 self.persisted_events.first().map(|e| e.recorded_at)
68 }
69
70 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
71 self.persisted_events.last().map(|e| e.recorded_at)
72 }
73
74 pub fn push(&mut self, event: T) {
75 self.new_events.push(event);
76 }
77
78 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
79 self.new_events.extend(events);
80 }
81
82 pub fn mark_new_events_persisted_at(
83 &mut self,
84 recorded_at: chrono::DateTime<chrono::Utc>,
85 ) -> usize {
86 let n = self.new_events.len();
87 let offset = self.persisted_events.len() + 1;
88 self.persisted_events
89 .extend(
90 self.new_events
91 .drain(..)
92 .enumerate()
93 .map(|(i, event)| PersistedEvent {
94 entity_id: self.entity_id.clone(),
95 recorded_at,
96 sequence: i + offset,
97 event,
98 }),
99 );
100 n
101 }
102
103 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
104 self.new_events
105 .iter()
106 .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
107 .collect()
108 }
109
110 pub fn any_new(&self) -> bool {
111 !self.new_events.is_empty()
112 }
113
114 pub fn len_persisted(&self) -> usize {
115 self.persisted_events.len()
116 }
117
118 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
119 self.persisted_events.iter()
120 }
121
122 pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
123 let start = self.persisted_events.len() - n;
124 self.persisted_events[start..].iter()
125 }
126
127 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
128 self.persisted_events
129 .iter()
130 .map(|e| &e.event)
131 .chain(self.new_events.iter())
132 }
133
134 pub fn load_first<E: EsEntity<Event = T>>(
135 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
136 ) -> Result<E, EsEntityError> {
137 let mut current_id = None;
138 let mut current = None;
139 for e in events {
140 if current_id.is_none() {
141 current_id = Some(e.entity_id.clone());
142 current = Some(Self {
143 entity_id: e.entity_id.clone(),
144 persisted_events: Vec::new(),
145 new_events: Vec::new(),
146 });
147 }
148 if current_id.as_ref() != Some(&e.entity_id) {
149 break;
150 }
151 let cur = current.as_mut().expect("Could not get current");
152 cur.persisted_events.push(PersistedEvent {
153 entity_id: e.entity_id,
154 recorded_at: e.recorded_at,
155 sequence: e.sequence as usize,
156 event: serde_json::from_value(e.event)?,
157 });
158 }
159 if let Some(current) = current {
160 E::try_from_events(current)
161 } else {
162 Err(EsEntityError::NotFound)
163 }
164 }
165
166 pub fn load_n<E: EsEntity<Event = T>>(
167 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
168 n: usize,
169 ) -> Result<(Vec<E>, bool), EsEntityError> {
170 let mut ret: Vec<E> = Vec::new();
171 let mut current_id = None;
172 let mut current = None;
173 for e in events {
174 if current_id.as_ref() != Some(&e.entity_id) {
175 if let Some(current) = current.take() {
176 ret.push(E::try_from_events(current)?);
177 if ret.len() == n {
178 return Ok((ret, true));
179 }
180 }
181
182 current_id = Some(e.entity_id.clone());
183 current = Some(Self {
184 entity_id: e.entity_id.clone(),
185 persisted_events: Vec::new(),
186 new_events: Vec::new(),
187 });
188 }
189 let cur = current.as_mut().expect("Could not get current");
190 cur.persisted_events.push(PersistedEvent {
191 entity_id: e.entity_id,
192 recorded_at: e.recorded_at,
193 sequence: e.sequence as usize,
194 event: serde_json::from_value(e.event)?,
195 });
196 }
197 if let Some(current) = current.take() {
198 ret.push(E::try_from_events(current)?);
199 }
200 Ok((ret, false))
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use uuid::Uuid;
208
209 #[derive(Debug, serde::Serialize, serde::Deserialize)]
210 enum DummyEntityEvent {
211 Created(String),
212 }
213
214 impl EsEvent for DummyEntityEvent {
215 type EntityId = Uuid;
216 }
217
218 struct DummyEntity {
219 name: String,
220
221 events: EntityEvents<DummyEntityEvent>,
222 }
223
224 impl EsEntity for DummyEntity {
225 type Event = DummyEntityEvent;
226 type New = NewDummyEntity;
227
228 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
229 &mut self.events
230 }
231 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
232 &self.events
233 }
234 }
235
236 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
237 fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
238 let name = events
239 .iter_persisted()
240 .map(|e| match &e.event {
241 DummyEntityEvent::Created(name) => name.clone(),
242 })
243 .next()
244 .expect("Could not find name");
245 Ok(Self { name, events })
246 }
247 }
248
249 struct NewDummyEntity {}
250
251 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
252 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
253 EntityEvents::init(
254 Uuid::new_v4(),
255 vec![DummyEntityEvent::Created("".to_owned())],
256 )
257 }
258 }
259
260 #[test]
261 fn load_zero_events() {
262 let generic_events = vec![];
263 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
264 assert!(matches!(res, Err(EsEntityError::NotFound)));
265 }
266
267 #[test]
268 fn load_first() {
269 let generic_events = vec![GenericEvent {
270 entity_id: uuid::Uuid::new_v4(),
271 sequence: 1,
272 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
273 .expect("Could not serialize"),
274 recorded_at: chrono::Utc::now(),
275 }];
276 let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
277 assert!(entity.name == "dummy-name");
278 }
279
280 #[test]
281 fn load_n() {
282 let generic_events = vec![
283 GenericEvent {
284 entity_id: uuid::Uuid::new_v4(),
285 sequence: 1,
286 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
287 .expect("Could not serialize"),
288 recorded_at: chrono::Utc::now(),
289 },
290 GenericEvent {
291 entity_id: uuid::Uuid::new_v4(),
292 sequence: 1,
293 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
294 .expect("Could not serialize"),
295 recorded_at: chrono::Utc::now(),
296 },
297 ];
298 let (entity, more): (Vec<DummyEntity>, _) =
299 EntityEvents::load_n(generic_events, 2).expect("Could not load");
300 assert!(!more);
301 assert_eq!(entity.len(), 2);
302 }
303}