1use chrono::{DateTime, Utc};
2
3use super::{error::EsEntityError, traits::*};
4
5pub type LastPersisted<'a, E> = std::slice::Iter<'a, PersistedEvent<E>>;
6
7pub struct GenericEvent<Id> {
8 pub entity_id: Id,
9 pub sequence: i32,
10 pub event: serde_json::Value,
11 pub recorded_at: DateTime<Utc>,
12}
13
14pub struct PersistedEvent<E: EsEvent> {
15 pub entity_id: <E as EsEvent>::EntityId,
16 pub recorded_at: DateTime<Utc>,
17 pub sequence: usize,
18 pub event: E,
19}
20
21impl<E: Clone + EsEvent> Clone for PersistedEvent<E> {
22 fn clone(&self) -> Self {
23 PersistedEvent {
24 entity_id: self.entity_id.clone(),
25 recorded_at: self.recorded_at,
26 sequence: self.sequence,
27 event: self.event.clone(),
28 }
29 }
30}
31
32pub struct EntityEvents<T: EsEvent> {
33 pub entity_id: <T as EsEvent>::EntityId,
34 persisted_events: Vec<PersistedEvent<T>>,
35 new_events: Vec<T>,
36}
37
38impl<T: Clone + EsEvent> Clone for EntityEvents<T> {
39 fn clone(&self) -> Self {
40 Self {
41 entity_id: self.entity_id.clone(),
42 persisted_events: self.persisted_events.clone(),
43 new_events: self.new_events.clone(),
44 }
45 }
46}
47
48impl<T> EntityEvents<T>
49where
50 T: EsEvent,
51{
52 pub fn init(id: <T as EsEvent>::EntityId, initial_events: impl IntoIterator<Item = T>) -> Self {
53 Self {
54 entity_id: id,
55 persisted_events: Vec::new(),
56 new_events: initial_events.into_iter().collect(),
57 }
58 }
59
60 pub fn id(&self) -> &<T as EsEvent>::EntityId {
61 &self.entity_id
62 }
63
64 pub fn entity_first_persisted_at(&self) -> Option<DateTime<Utc>> {
65 self.persisted_events.first().map(|e| e.recorded_at)
66 }
67
68 pub fn entity_last_modified_at(&self) -> Option<DateTime<Utc>> {
69 self.persisted_events.last().map(|e| e.recorded_at)
70 }
71
72 pub fn push(&mut self, event: T) {
73 self.new_events.push(event);
74 }
75
76 pub fn mark_new_events_persisted_at(
77 &mut self,
78 recorded_at: chrono::DateTime<chrono::Utc>,
79 ) -> usize {
80 let n = self.new_events.len();
81 let offset = self.persisted_events.len() + 1;
82 self.persisted_events
83 .extend(
84 self.new_events
85 .drain(..)
86 .enumerate()
87 .map(|(i, event)| PersistedEvent {
88 entity_id: self.entity_id.clone(),
89 recorded_at,
90 sequence: i + offset,
91 event,
92 }),
93 );
94 n
95 }
96
97 pub fn serialize_new_events(&self) -> Vec<serde_json::Value> {
98 self.new_events
99 .iter()
100 .map(|event| serde_json::to_value(event).expect("Failed to serialize event"))
101 .collect()
102 }
103
104 pub fn any_new(&self) -> bool {
105 !self.new_events.is_empty()
106 }
107
108 pub fn len_persisted(&self) -> usize {
109 self.persisted_events.len()
110 }
111
112 pub fn iter_persisted(&self) -> impl DoubleEndedIterator<Item = &PersistedEvent<T>> {
113 self.persisted_events.iter()
114 }
115
116 pub fn last_persisted(&self, n: usize) -> LastPersisted<T> {
117 let start = self.persisted_events.len() - n;
118 self.persisted_events[start..].iter()
119 }
120
121 pub fn iter_all(&self) -> impl DoubleEndedIterator<Item = &T> {
122 self.persisted_events
123 .iter()
124 .map(|e| &e.event)
125 .chain(self.new_events.iter())
126 }
127
128 pub fn load_first<E: EsEntity<Event = T>>(
129 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
130 ) -> Result<E, EsEntityError> {
131 let mut current_id = None;
132 let mut current = None;
133 for e in events {
134 if current_id.is_none() {
135 current_id = Some(e.entity_id.clone());
136 current = Some(Self {
137 entity_id: e.entity_id.clone(),
138 persisted_events: Vec::new(),
139 new_events: Vec::new(),
140 });
141 }
142 if current_id.as_ref() != Some(&e.entity_id) {
143 break;
144 }
145 let cur = current.as_mut().expect("Could not get current");
146 cur.persisted_events.push(PersistedEvent {
147 entity_id: e.entity_id,
148 recorded_at: e.recorded_at,
149 sequence: e.sequence as usize,
150 event: serde_json::from_value(e.event)?,
151 });
152 }
153 if let Some(current) = current {
154 E::try_from_events(current)
155 } else {
156 Err(EsEntityError::NotFound)
157 }
158 }
159
160 pub fn load_n<E: EsEntity<Event = T>>(
161 events: impl IntoIterator<Item = GenericEvent<<T as EsEvent>::EntityId>>,
162 n: usize,
163 ) -> Result<(Vec<E>, bool), EsEntityError> {
164 let mut ret: Vec<E> = Vec::new();
165 let mut current_id = None;
166 let mut current = None;
167 for e in events {
168 if current_id.as_ref() != Some(&e.entity_id) {
169 if let Some(current) = current.take() {
170 ret.push(E::try_from_events(current)?);
171 if ret.len() == n {
172 return Ok((ret, true));
173 }
174 }
175
176 current_id = Some(e.entity_id.clone());
177 current = Some(Self {
178 entity_id: e.entity_id.clone(),
179 persisted_events: Vec::new(),
180 new_events: Vec::new(),
181 });
182 }
183 let cur = current.as_mut().expect("Could not get current");
184 cur.persisted_events.push(PersistedEvent {
185 entity_id: e.entity_id,
186 recorded_at: e.recorded_at,
187 sequence: e.sequence as usize,
188 event: serde_json::from_value(e.event)?,
189 });
190 }
191 if let Some(current) = current.take() {
192 ret.push(E::try_from_events(current)?);
193 }
194 Ok((ret, false))
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use uuid::Uuid;
202
203 #[derive(Debug, serde::Serialize, serde::Deserialize)]
204 enum DummyEntityEvent {
205 Created(String),
206 }
207
208 impl EsEvent for DummyEntityEvent {
209 type EntityId = Uuid;
210 }
211
212 struct DummyEntity {
213 name: String,
214
215 events: EntityEvents<DummyEntityEvent>,
216 }
217
218 impl EsEntity for DummyEntity {
219 type Event = DummyEntityEvent;
220 type New = NewDummyEntity;
221
222 fn events_mut(&mut self) -> &mut EntityEvents<DummyEntityEvent> {
223 &mut self.events
224 }
225 fn events(&self) -> &EntityEvents<DummyEntityEvent> {
226 &self.events
227 }
228 }
229
230 impl TryFromEvents<DummyEntityEvent> for DummyEntity {
231 fn try_from_events(events: EntityEvents<DummyEntityEvent>) -> Result<Self, EsEntityError> {
232 let name = events
233 .iter_persisted()
234 .map(|e| match &e.event {
235 DummyEntityEvent::Created(name) => name.clone(),
236 })
237 .next()
238 .expect("Could not find name");
239 Ok(Self { name, events })
240 }
241 }
242
243 struct NewDummyEntity {}
244
245 impl IntoEvents<DummyEntityEvent> for NewDummyEntity {
246 fn into_events(self) -> EntityEvents<DummyEntityEvent> {
247 EntityEvents::init(
248 Uuid::new_v4(),
249 vec![DummyEntityEvent::Created("".to_owned())],
250 )
251 }
252 }
253
254 #[test]
255 fn load_zero_events() {
256 let generic_events = vec![];
257 let res = EntityEvents::load_first::<DummyEntity>(generic_events);
258 assert!(matches!(res, Err(EsEntityError::NotFound)));
259 }
260
261 #[test]
262 fn load_first() {
263 let generic_events = vec![GenericEvent {
264 entity_id: uuid::Uuid::new_v4(),
265 sequence: 1,
266 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
267 .expect("Could not serialize"),
268 recorded_at: chrono::Utc::now(),
269 }];
270 let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load");
271 assert!(entity.name == "dummy-name");
272 }
273
274 #[test]
275 fn load_n() {
276 let generic_events = vec![
277 GenericEvent {
278 entity_id: uuid::Uuid::new_v4(),
279 sequence: 1,
280 event: serde_json::to_value(DummyEntityEvent::Created("dummy-name".to_owned()))
281 .expect("Could not serialize"),
282 recorded_at: chrono::Utc::now(),
283 },
284 GenericEvent {
285 entity_id: uuid::Uuid::new_v4(),
286 sequence: 1,
287 event: serde_json::to_value(DummyEntityEvent::Created("other-name".to_owned()))
288 .expect("Could not serialize"),
289 recorded_at: chrono::Utc::now(),
290 },
291 ];
292 let (entity, more): (Vec<DummyEntity>, _) =
293 EntityEvents::load_n(generic_events, 2).expect("Could not load");
294 assert!(!more);
295 assert_eq!(entity.len(), 2);
296 }
297}