1use std::future::Future;
2use std::marker::PhantomData;
3
4use crate::decider::{Decider, EventComputation, StateComputation};
5use crate::saga::{ActionComputation, Saga};
6use crate::Identifier;
7
8pub trait EventRepository<C, E, Version, Error> {
17 fn fetch_events(
21 &self,
22 command: &C,
23 ) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
24 fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
28
29 fn version_provider(
33 &self,
34 event: &E,
35 ) -> impl Future<Output = Result<Option<Version>, Error>> + Send;
36}
37
38pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
53where
54 Repository: EventRepository<C, E, Version, Error>,
55 Decider: EventComputation<C, S, E, Error>,
56{
57 repository: Repository,
58 decider: Decider,
59 _marker: PhantomData<(C, S, E, Version, Error)>,
60}
61
62impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E, Error>
63 for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
64where
65 Repository: EventRepository<C, E, Version, Error>,
66 Decider: EventComputation<C, S, E, Error>,
67{
68 fn compute_new_events(&self, current_events: &[E], command: &C) -> Result<Vec<E>, Error> {
70 self.decider.compute_new_events(current_events, command)
71 }
72}
73
74impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
75 for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
76where
77 Repository: EventRepository<C, E, Version, Error> + Sync,
78 Decider: EventComputation<C, S, E, Error> + Sync,
79 C: Sync,
80 S: Sync,
81 E: Sync,
82 Version: Sync,
83 Error: Sync,
84{
85 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
87 self.repository.fetch_events(command).await
88 }
89 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
91 self.repository.save(events).await
92 }
93 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
95 self.repository.version_provider(event).await
96 }
97}
98
99impl<C, S, E, Repository, Decider, Version, Error>
100 EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
101where
102 Repository: EventRepository<C, E, Version, Error> + Sync,
103 Decider: EventComputation<C, S, E, Error> + Sync,
104 C: Sync,
105 S: Sync,
106 E: Sync,
107 Version: Sync,
108 Error: Sync,
109{
110 pub fn new(repository: Repository, decider: Decider) -> Self {
112 EventSourcedAggregate {
113 repository,
114 decider,
115 _marker: PhantomData,
116 }
117 }
118 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
120 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
121 let mut current_events: Vec<E> = vec![];
122 for (event, _) in events {
123 current_events.push(event);
124 }
125 let new_events = self.compute_new_events(¤t_events, command)?;
126 let saved_events = self.save(&new_events).await?;
127 Ok(saved_events)
128 }
129}
130
131pub trait StateRepository<C, S, Version, Error> {
140 fn fetch_state(
144 &self,
145 command: &C,
146 ) -> impl Future<Output = Result<Option<(S, Version)>, Error>> + Send;
147 fn save(
151 &self,
152 state: &S,
153 version: &Option<Version>,
154 ) -> impl Future<Output = Result<(S, Version), Error>> + Send;
155}
156
157pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
172where
173 Repository: StateRepository<C, S, Version, Error>,
174 Decider: StateComputation<C, S, E, Error>,
175{
176 repository: Repository,
177 decider: Decider,
178 _marker: PhantomData<(C, S, E, Version, Error)>,
179}
180
181impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E, Error>
182 for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
183where
184 Repository: StateRepository<C, S, Version, Error>,
185 Decider: StateComputation<C, S, E, Error>,
186{
187 fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
189 self.decider.compute_new_state(current_state, command)
190 }
191}
192
193impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
194 for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
195where
196 Repository: StateRepository<C, S, Version, Error> + Sync,
197 Decider: StateComputation<C, S, E, Error> + Sync,
198 C: Sync,
199 S: Sync,
200 E: Sync,
201 Version: Sync,
202 Error: Sync,
203{
204 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
206 self.repository.fetch_state(command).await
207 }
208 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
210 self.repository.save(state, version).await
211 }
212}
213
214impl<C, S, E, Repository, Decider, Version, Error>
215 StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
216where
217 Repository: StateRepository<C, S, Version, Error> + Sync,
218 Decider: StateComputation<C, S, E, Error> + Sync,
219 C: Sync,
220 S: Sync,
221 E: Sync,
222 Version: Sync,
223 Error: Sync,
224{
225 pub fn new(repository: Repository, decider: Decider) -> Self {
227 StateStoredAggregate {
228 repository,
229 decider,
230 _marker: PhantomData,
231 }
232 }
233 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
235 let state_version = self.fetch_state(command).await?;
236 match state_version {
237 None => {
238 let new_state = self.compute_new_state(None, command)?;
239 let saved_state = self.save(&new_state, &None).await?;
240 Ok(saved_state)
241 }
242 Some((state, version)) => {
243 let new_state = self.compute_new_state(Some(state), command)?;
244 let saved_state = self.save(&new_state, &Some(version)).await?;
245 Ok(saved_state)
246 }
247 }
248 }
249}
250
251pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
263where
264 Repository: EventRepository<C, E, Version, Error>,
265{
266 repository: Repository,
267 decider: Decider<'a, C, S, E, Error>,
268 saga: Saga<'a, E, C>,
269 _marker: PhantomData<(C, S, E, Version, Error)>,
270}
271
272impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
273 for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
274where
275 Repository: EventRepository<C, E, Version, Error> + Sync,
276 C: Sync,
277 S: Sync,
278 E: Sync,
279 Version: Sync,
280 Error: Sync,
281{
282 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
284 self.repository.fetch_events(command).await
285 }
286 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
288 self.repository.save(events).await
289 }
290 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
292 self.repository.version_provider(event).await
293 }
294}
295
296impl<'a, C, S, E, Repository, Version, Error>
297 EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
298where
299 Repository: EventRepository<C, E, Version, Error> + Sync,
300 C: Sync,
301 S: Sync,
302 E: Sync + Clone,
303 Version: Sync,
304 Error: Sync,
305{
306 pub fn new(
308 repository: Repository,
309 decider: Decider<'a, C, S, E, Error>,
310 saga: Saga<'a, E, C>,
311 ) -> Self {
312 EventSourcedOrchestratingAggregate {
313 repository,
314 decider,
315 saga,
316 _marker: PhantomData,
317 }
318 }
319 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
321 where
322 E: Identifier,
323 C: Identifier,
324 {
325 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
326 let mut current_events: Vec<E> = vec![];
327 for (event, _) in events {
328 current_events.push(event);
329 }
330 let new_events = self
331 .compute_new_events_dynamically(¤t_events, command)
332 .await?;
333 let saved_events = self.save(&new_events).await?;
334 Ok(saved_events)
335 }
336 async fn compute_new_events_dynamically(
341 &self,
342 current_events: &[E],
343 command: &C,
344 ) -> Result<Vec<E>, Error>
345 where
346 E: Identifier,
347 C: Identifier,
348 {
349 let current_state: S = current_events
350 .iter()
351 .fold((self.decider.initial_state)(), |state, event| {
352 (self.decider.evolve)(&state, event)
353 });
354
355 let initial_events = (self.decider.decide)(command, ¤t_state)?;
356
357 let commands: Vec<C> = initial_events
358 .iter()
359 .flat_map(|event: &E| self.saga.compute_new_actions(event))
360 .collect();
361
362 let mut all_events = initial_events.clone();
364
365 for command in commands.iter() {
366 let previous_events = [
367 self.repository
368 .fetch_events(command)
369 .await?
370 .iter()
371 .map(|(e, _)| e.clone())
372 .collect::<Vec<E>>(),
373 initial_events
374 .clone()
375 .into_iter()
376 .filter(|e| e.identifier() == command.identifier())
377 .collect::<Vec<E>>(),
378 ]
379 .concat();
380
381 let new_events =
384 Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
385 all_events.extend(new_events);
386 }
387
388 Ok(all_events)
389 }
390}
391
392pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
407where
408 Repository: StateRepository<C, S, Version, Error>,
409{
410 repository: Repository,
411 decider: Decider<'a, C, S, E, Error>,
412 saga: Saga<'a, E, C>,
413 _marker: PhantomData<(C, S, E, Version, Error)>,
414}
415
416impl<C, S, E, Repository, Version, Error> StateComputation<C, S, E, Error>
417 for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
418where
419 Repository: StateRepository<C, S, Version, Error>,
420 S: Clone,
421{
422 fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
424 let effective_current_state =
425 current_state.unwrap_or_else(|| (self.decider.initial_state)());
426 let events = (self.decider.decide)(command, &effective_current_state)?;
427 let mut new_state = events.iter().fold(effective_current_state, |state, event| {
428 (self.decider.evolve)(&state, event)
429 });
430 let commands = events
431 .iter()
432 .flat_map(|event: &E| self.saga.compute_new_actions(event))
433 .collect::<Vec<C>>();
434 for action in commands {
435 new_state = self.compute_new_state(Some(new_state.clone()), &action)?;
436 }
437 Ok(new_state)
438 }
439}
440
441impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
442 for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
443where
444 Repository: StateRepository<C, S, Version, Error> + Sync,
445 C: Sync,
446 S: Sync,
447 E: Sync,
448 Version: Sync,
449 Error: Sync,
450{
451 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
453 self.repository.fetch_state(command).await
454 }
455 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
457 self.repository.save(state, version).await
458 }
459}
460
461impl<'a, C, S, E, Repository, Version, Error>
462 StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
463where
464 Repository: StateRepository<C, S, Version, Error> + Sync,
465 C: Sync,
466 S: Sync + Clone,
467 E: Sync,
468 Version: Sync,
469 Error: Sync,
470{
471 pub fn new(
473 repository: Repository,
474 decider: Decider<'a, C, S, E, Error>,
475 saga: Saga<'a, E, C>,
476 ) -> Self {
477 StateStoredOrchestratingAggregate {
478 repository,
479 decider,
480 saga,
481 _marker: PhantomData,
482 }
483 }
484 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
486 let state_version = self.fetch_state(command).await?;
487 match state_version {
488 None => {
489 let new_state = self.compute_new_state(None, command)?;
490 let saved_state = self.save(&new_state, &None).await?;
491 Ok(saved_state)
492 }
493 Some((state, version)) => {
494 let new_state = self.compute_new_state(Some(state), command)?;
495 let saved_state = self.save(&new_state, &Some(version)).await?;
496 Ok(saved_state)
497 }
498 }
499 }
500}