1use chrono::{DateTime, Utc};
2
3use super::{error::EsEntityError, traits::*};
4
5pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
6
7pub struct GenericEvent<Id> {
8 pub entity_id: Id,
9 pub sequence: i32,
10 pub event: serde_json::Value,
11 pub recorded_at: DateTime<Utc>,
12}
13
14pub struct PersistedEvent<E: EsEvent> {
15 pub entity_id: <E as EsEvent>::EntityId,
16 pub recorded_at: DateTime<Utc>,
17 pub sequence: usize,
18 pub event: E,
19}
20
21impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
22 fn clone(&self) -> Self {
23 PersistedEvent {
24 entity_id: self.entity_id.clone(),
25 recorded_at: self.recorded_at,
26 sequence: self.sequence,
27 event: self.event.clone(),
28 }
29 }
30}
31
32pub struct EntityEvents<T: EsEvent> {
33 pub entity_id: <T as EsEvent>::EntityId,
34 persisted_events: Vec<PersistedEvent<T>>,
35 new_events: Vec<T>,
36}
37
38impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
39 fn clone(&self) -> Self {
40 Self {
41 entity_id: self.entity_id.clone(),
42 persisted_events: self.persisted_events.clone(),
43 new_events: self.new_events.clone(),
44 }
45 }
46}
47
48impl<T> EntityEvents<T>
49where
50 T: EsEvent,
51{
52 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
53 Self {
54 entity_id: id,
55 persisted_events: Vec::new(),
56 new_events: initial_events.into_iter().collect(),
57 }
58 }
59
60 pub fn id(&self) -> &<T as EsEvent>::EntityId {
61 &self.entity_id
62 }
63
64 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
65 self.persisted_events.first().map(|e| e.recorded_at)
66 }
67
68 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
69 self.persisted_events.last().map(|e| e.recorded_at)
70 }
71
72 pub fn push(&mut self, event: T) {
73 self.new_events.push(event);
74 }
75
76 pub fn extend(&mut self, events: impl IntoIterator<Item = T>) {
77 self.new_events.extend(events);
78 }
79
80 pub fn mark_new_events_persisted_at(
81 &mut self,
82 recorded_at: chrono::DateTime<chrono::Utc>,
83 ) -> usize {
84 let n = self.new_events.len();
85 let offset = self.persisted_events.len() + 1;
86 self.persisted_events
87 .extend(
88 self.new_events
89 .drain(..)
90 .enumerate()
91 .map(|(i, event)| PersistedEvent {
92 entity_id: self.entity_id.clone(),
93 recorded_at,
94 sequence: i + offset,
95 event,
96 }),
97 );
98 n
99 }
100
101 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
102 self.new_events
103 .iter()
104 .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
105 .collect()
106 }
107
108 pub fn any_new(&self) -> bool {
109 !self.new_events.is_empty()
110 }
111
112 pub fn len_persisted(&self) -> usize {
113 self.persisted_events.len()
114 }
115
116 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
117 self.persisted_events.iter()
118 }
119
120 pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
121 let start = self.persisted_events.len() - n;
122 self.persisted_events[start..].iter()
123 }
124
125 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
126 self.persisted_events
127 .iter()
128 .map(|e| &e.event)
129 .chain(self.new_events.iter())
130 }
131
132 pub fn load_first<E: EsEntity<Event = T>>(
133 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
134 ) -> Result<E, EsEntityError> {
135 let mut current_id = None;
136 let mut current = None;
137 for e in events {
138 if current_id.is_none() {
139 current_id = Some(e.entity_id.clone());
140 current = Some(Self {
141 entity_id: e.entity_id.clone(),
142 persisted_events: Vec::new(),
143 new_events: Vec::new(),
144 });
145 }
146 if current_id.as_ref() != Some(&e.entity_id) {
147 break;
148 }
149 let cur = current.as_mut().expect("Could not get current");
150 cur.persisted_events.push(PersistedEvent {
151 entity_id: e.entity_id,
152 recorded_at: e.recorded_at,
153 sequence: e.sequence as usize,
154 event: serde_json::from_value(e.event)?,
155 });
156 }
157 if let Some(current) = current {
158 E::try_from_events(current)
159 } else {
160 Err(EsEntityError::NotFound)
161 }
162 }
163
164 pub fn load_n<E: EsEntity<Event = T>>(
165 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
166 n: usize,
167 ) -> Result<(Vec<E>, bool), EsEntityError> {
168 let mut ret: Vec<E> = Vec::new();
169 let mut current_id = None;
170 let mut current = None;
171 for e in events {
172 if current_id.as_ref() != Some(&e.entity_id) {
173 if let Some(current) = current.take() {
174 ret.push(E::try_from_events(current)?);
175 if ret.len() == n {
176 return Ok((ret, true));
177 }
178 }
179
180 current_id = Some(e.entity_id.clone());
181 current = Some(Self {
182 entity_id: e.entity_id.clone(),
183 persisted_events: Vec::new(),
184 new_events: Vec::new(),
185 });
186 }
187 let cur = current.as_mut().expect("Could not get current");
188 cur.persisted_events.push(PersistedEvent {
189 entity_id: e.entity_id,
190 recorded_at: e.recorded_at,
191 sequence: e.sequence as usize,
192 event: serde_json::from_value(e.event)?,
193 });
194 }
195 if let Some(current) = current.take() {
196 ret.push(E::try_from_events(current)?);
197 }
198 Ok((ret, false))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use uuid::Uuid;
206
207 #[derive(Debug, serde::Serialize, serde::Deserialize)]
208 enum DummyEntityEvent {
209 Created(String),
210 }
211
212 impl EsEvent for DummyEntityEvent {
213 type EntityId = Uuid;
214 }
215
216 struct DummyEntity {
217 name: String,
218
219 events: EntityEvents<DummyEntityEvent>,
220 }
221
222 impl EsEntity for DummyEntity {
223 type Event = DummyEntityEvent;
224 type New = NewDummyEntity;
225
226 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
227 &mut self.events
228 }
229 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
230 &self.events
231 }
232 }
233
234 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
235 fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
236 let name = events
237 .iter_persisted()
238 .map(|e| match &e.event {
239 DummyEntityEvent::Created(name) => name.clone(),
240 })
241 .next()
242 .expect("Could not find name");
243 Ok(Self { name, events })
244 }
245 }
246
247 struct NewDummyEntity {}
248
249 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
250 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
251 EntityEvents::init(
252 Uuid::new_v4(),
253 vec![DummyEntityEvent::Created("".to_owned())],
254 )
255 }
256 }
257
258 #[test]
259 fn load_zero_events() {
260 let generic_events = vec![];
261 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
262 assert!(matches!(res, Err(EsEntityError::NotFound)));
263 }
264
265 #[test]
266 fn load_first() {
267 let generic_events = vec![GenericEvent {
268 entity_id: uuid::Uuid::new_v4(),
269 sequence: 1,
270 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
271 .expect("Could not serialize"),
272 recorded_at: chrono::Utc::now(),
273 }];
274 let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
275 assert!(entity.name == "dummy-name");
276 }
277
278 #[test]
279 fn load_n() {
280 let generic_events = vec![
281 GenericEvent {
282 entity_id: uuid::Uuid::new_v4(),
283 sequence: 1,
284 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
285 .expect("Could not serialize"),
286 recorded_at: chrono::Utc::now(),
287 },
288 GenericEvent {
289 entity_id: uuid::Uuid::new_v4(),
290 sequence: 1,
291 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
292 .expect("Could not serialize"),
293 recorded_at: chrono::Utc::now(),
294 },
295 ];
296 let (entity, more): (Vec<DummyEntity>, _) =
297 EntityEvents::load_n(generic_events, 2).expect("Could not load");
298 assert!(!more);
299 assert_eq!(entity.len(), 2);
300 }
301}