1use std::future::Future;
2use std::marker::PhantomData;
3
4use crate::decider::{Decider, EventComputation, StateComputation};
5use crate::saga::{ActionComputation, Saga};
6use crate::Identifier;
7
8#[cfg(not(feature = "not-send-futures"))]
17pub trait EventRepository<C, E, Version, Error> {
18 fn fetch_events(
22 &self,
23 command: &C,
24 ) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
25 fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
29
30 fn version_provider(
34 &self,
35 event: &E,
36 ) -> impl Future<Output = Result<Option<Version>, Error>> + Send;
37}
38
39#[cfg(feature = "not-send-futures")]
48pub trait EventRepository<C, E, Version, Error> {
49 fn fetch_events(&self, command: &C) -> impl Future<Output = Result<Vec<(E, Version)>, Error>>;
53 fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>>;
57
58 fn version_provider(&self, event: &E) -> impl Future<Output = Result<Option<Version>, Error>>;
62}
63
64pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
79where
80 Repository: EventRepository<C, E, Version, Error>,
81 Decider: EventComputation<C, S, E, Error>,
82{
83 repository: Repository,
84 decider: Decider,
85 _marker: PhantomData<(C, S, E, Version, Error)>,
86}
87
88impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E, Error>
89 for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
90where
91 Repository: EventRepository<C, E, Version, Error>,
92 Decider: EventComputation<C, S, E, Error>,
93{
94 fn compute_new_events(&self, current_events: &[E], command: &C) -> Result<Vec<E>, Error> {
96 self.decider.compute_new_events(current_events, command)
97 }
98}
99
100#[cfg(not(feature = "not-send-futures"))]
101impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
102 for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
103where
104 Repository: EventRepository<C, E, Version, Error> + Sync,
105 Decider: EventComputation<C, S, E, Error> + Sync,
106 C: Sync,
107 S: Sync,
108 E: Sync,
109 Version: Sync,
110 Error: Sync,
111{
112 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
114 self.repository.fetch_events(command).await
115 }
116 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
118 self.repository.save(events).await
119 }
120 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
122 self.repository.version_provider(event).await
123 }
124}
125
126#[cfg(feature = "not-send-futures")]
127impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
128 for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
129where
130 Repository: EventRepository<C, E, Version, Error>,
131 Decider: EventComputation<C, S, E, Error>,
132{
133 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
135 self.repository.fetch_events(command).await
136 }
137 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
139 self.repository.save(events).await
140 }
141 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
143 self.repository.version_provider(event).await
144 }
145}
146
147#[cfg(not(feature = "not-send-futures"))]
148impl<C, S, E, Repository, Decider, Version, Error>
149 EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
150where
151 Repository: EventRepository<C, E, Version, Error> + Sync,
152 Decider: EventComputation<C, S, E, Error> + Sync,
153 C: Sync,
154 S: Sync,
155 E: Sync,
156 Version: Sync,
157 Error: Sync,
158{
159 pub fn new(repository: Repository, decider: Decider) -> Self {
161 EventSourcedAggregate {
162 repository,
163 decider,
164 _marker: PhantomData,
165 }
166 }
167 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
169 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
170 let mut current_events: Vec<E> = vec![];
171 for (event, _) in events {
172 current_events.push(event);
173 }
174 let new_events = self.compute_new_events(¤t_events, command)?;
175 let saved_events = self.save(&new_events).await?;
176 Ok(saved_events)
177 }
178}
179
180#[cfg(feature = "not-send-futures")]
181impl<C, S, E, Repository, Decider, Version, Error>
182 EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
183where
184 Repository: EventRepository<C, E, Version, Error>,
185 Decider: EventComputation<C, S, E, Error>,
186{
187 pub fn new(repository: Repository, decider: Decider) -> Self {
189 EventSourcedAggregate {
190 repository,
191 decider,
192 _marker: PhantomData,
193 }
194 }
195 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
197 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
198 let mut current_events: Vec<E> = vec![];
199 for (event, _) in events {
200 current_events.push(event);
201 }
202 let new_events = self.compute_new_events(¤t_events, command)?;
203 let saved_events = self.save(&new_events).await?;
204 Ok(saved_events)
205 }
206}
207
208#[cfg(not(feature = "not-send-futures"))]
217pub trait StateRepository<C, S, Version, Error> {
218 fn fetch_state(
222 &self,
223 command: &C,
224 ) -> impl Future<Output = Result<Option<(S, Version)>, Error>> + Send;
225 fn save(
229 &self,
230 state: &S,
231 version: &Option<Version>,
232 ) -> impl Future<Output = Result<(S, Version), Error>> + Send;
233}
234
235#[cfg(feature = "not-send-futures")]
244pub trait StateRepository<C, S, Version, Error> {
245 fn fetch_state(&self, command: &C)
249 -> impl Future<Output = Result<Option<(S, Version)>, Error>>;
250 fn save(
254 &self,
255 state: &S,
256 version: &Option<Version>,
257 ) -> impl Future<Output = Result<(S, Version), Error>>;
258}
259
260pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
275where
276 Repository: StateRepository<C, S, Version, Error>,
277 Decider: StateComputation<C, S, E, Error>,
278{
279 repository: Repository,
280 decider: Decider,
281 _marker: PhantomData<(C, S, E, Version, Error)>,
282}
283
284impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E, Error>
285 for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
286where
287 Repository: StateRepository<C, S, Version, Error>,
288 Decider: StateComputation<C, S, E, Error>,
289{
290 fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
292 self.decider.compute_new_state(current_state, command)
293 }
294}
295
296#[cfg(not(feature = "not-send-futures"))]
297impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
298 for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
299where
300 Repository: StateRepository<C, S, Version, Error> + Sync,
301 Decider: StateComputation<C, S, E, Error> + Sync,
302 C: Sync,
303 S: Sync,
304 E: Sync,
305 Version: Sync,
306 Error: Sync,
307{
308 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
310 self.repository.fetch_state(command).await
311 }
312 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
314 self.repository.save(state, version).await
315 }
316}
317
318#[cfg(feature = "not-send-futures")]
319impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
320 for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
321where
322 Repository: StateRepository<C, S, Version, Error>,
323 Decider: StateComputation<C, S, E, Error>,
324{
325 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
327 self.repository.fetch_state(command).await
328 }
329 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
331 self.repository.save(state, version).await
332 }
333}
334
335#[cfg(not(feature = "not-send-futures"))]
336impl<C, S, E, Repository, Decider, Version, Error>
337 StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
338where
339 Repository: StateRepository<C, S, Version, Error> + Sync,
340 Decider: StateComputation<C, S, E, Error> + Sync,
341 C: Sync,
342 S: Sync,
343 E: Sync,
344 Version: Sync,
345 Error: Sync,
346{
347 pub fn new(repository: Repository, decider: Decider) -> Self {
349 StateStoredAggregate {
350 repository,
351 decider,
352 _marker: PhantomData,
353 }
354 }
355 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
357 let state_version = self.fetch_state(command).await?;
358 match state_version {
359 None => {
360 let new_state = self.compute_new_state(None, command)?;
361 let saved_state = self.save(&new_state, &None).await?;
362 Ok(saved_state)
363 }
364 Some((state, version)) => {
365 let new_state = self.compute_new_state(Some(state), command)?;
366 let saved_state = self.save(&new_state, &Some(version)).await?;
367 Ok(saved_state)
368 }
369 }
370 }
371}
372
373#[cfg(feature = "not-send-futures")]
374impl<C, S, E, Repository, Decider, Version, Error>
375 StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
376where
377 Repository: StateRepository<C, S, Version, Error>,
378 Decider: StateComputation<C, S, E, Error>,
379{
380 pub fn new(repository: Repository, decider: Decider) -> Self {
382 StateStoredAggregate {
383 repository,
384 decider,
385 _marker: PhantomData,
386 }
387 }
388 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
390 let state_version = self.fetch_state(command).await?;
391 match state_version {
392 None => {
393 let new_state = self.compute_new_state(None, command)?;
394 let saved_state = self.save(&new_state, &None).await?;
395 Ok(saved_state)
396 }
397 Some((state, version)) => {
398 let new_state = self.compute_new_state(Some(state), command)?;
399 let saved_state = self.save(&new_state, &Some(version)).await?;
400 Ok(saved_state)
401 }
402 }
403 }
404}
405
406pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
418where
419 Repository: EventRepository<C, E, Version, Error>,
420{
421 repository: Repository,
422 decider: Decider<'a, C, S, E, Error>,
423 saga: Saga<'a, E, C>,
424 _marker: PhantomData<(C, S, E, Version, Error)>,
425}
426
427#[cfg(not(feature = "not-send-futures"))]
428impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
429 for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
430where
431 Repository: EventRepository<C, E, Version, Error> + Sync,
432 C: Sync,
433 S: Sync,
434 E: Sync,
435 Version: Sync,
436 Error: Sync,
437{
438 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
440 self.repository.fetch_events(command).await
441 }
442 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
444 self.repository.save(events).await
445 }
446 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
448 self.repository.version_provider(event).await
449 }
450}
451
452#[cfg(feature = "not-send-futures")]
453impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
454 for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
455where
456 Repository: EventRepository<C, E, Version, Error>,
457{
458 async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
460 self.repository.fetch_events(command).await
461 }
462 async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
464 self.repository.save(events).await
465 }
466 async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
468 self.repository.version_provider(event).await
469 }
470}
471
472#[cfg(not(feature = "not-send-futures"))]
473impl<'a, C, S, E, Repository, Version, Error>
474 EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
475where
476 Repository: EventRepository<C, E, Version, Error> + Sync,
477 C: Sync,
478 S: Sync,
479 E: Sync + Clone,
480 Version: Sync,
481 Error: Sync,
482{
483 pub fn new(
485 repository: Repository,
486 decider: Decider<'a, C, S, E, Error>,
487 saga: Saga<'a, E, C>,
488 ) -> Self {
489 EventSourcedOrchestratingAggregate {
490 repository,
491 decider,
492 saga,
493 _marker: PhantomData,
494 }
495 }
496 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
498 where
499 E: Identifier,
500 C: Identifier,
501 {
502 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
503 let mut current_events: Vec<E> = vec![];
504 for (event, _) in events {
505 current_events.push(event);
506 }
507 let new_events = self
508 .compute_new_events_dynamically(¤t_events, command)
509 .await?;
510 let saved_events = self.save(&new_events).await?;
511 Ok(saved_events)
512 }
513 async fn compute_new_events_dynamically(
518 &self,
519 current_events: &[E],
520 command: &C,
521 ) -> Result<Vec<E>, Error>
522 where
523 E: Identifier,
524 C: Identifier,
525 {
526 let current_state: S = current_events
527 .iter()
528 .fold((self.decider.initial_state)(), |state, event| {
529 (self.decider.evolve)(&state, event)
530 });
531
532 let initial_events = (self.decider.decide)(command, ¤t_state)?;
533
534 let commands: Vec<C> = initial_events
535 .iter()
536 .flat_map(|event: &E| self.saga.compute_new_actions(event))
537 .collect();
538
539 let mut all_events = initial_events.clone();
541
542 for command in commands.iter() {
543 let previous_events = [
544 self.repository
545 .fetch_events(command)
546 .await?
547 .iter()
548 .map(|(e, _)| e.clone())
549 .collect::<Vec<E>>(),
550 initial_events
551 .clone()
552 .into_iter()
553 .filter(|e| e.identifier() == command.identifier())
554 .collect::<Vec<E>>(),
555 ]
556 .concat();
557
558 let new_events =
561 Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
562 all_events.extend(new_events);
563 }
564
565 Ok(all_events)
566 }
567}
568
569#[cfg(feature = "not-send-futures")]
570impl<'a, C, S, E, Repository, Version, Error>
571 EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
572where
573 Repository: EventRepository<C, E, Version, Error>,
574 E: Clone,
575{
576 pub fn new(
578 repository: Repository,
579 decider: Decider<'a, C, S, E, Error>,
580 saga: Saga<'a, E, C>,
581 ) -> Self {
582 EventSourcedOrchestratingAggregate {
583 repository,
584 decider,
585 saga,
586 _marker: PhantomData,
587 }
588 }
589 pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
591 where
592 E: Identifier,
593 C: Identifier,
594 {
595 let events: Vec<(E, Version)> = self.fetch_events(command).await?;
596 let mut current_events: Vec<E> = vec![];
597 for (event, _) in events {
598 current_events.push(event);
599 }
600 let new_events = self
601 .compute_new_events_dynamically(¤t_events, command)
602 .await?;
603 let saved_events = self.save(&new_events).await?;
604 Ok(saved_events)
605 }
606 async fn compute_new_events_dynamically(
611 &self,
612 current_events: &[E],
613 command: &C,
614 ) -> Result<Vec<E>, Error>
615 where
616 E: Identifier,
617 C: Identifier,
618 {
619 let current_state: S = current_events
620 .iter()
621 .fold((self.decider.initial_state)(), |state, event| {
622 (self.decider.evolve)(&state, event)
623 });
624
625 let initial_events = (self.decider.decide)(command, ¤t_state)?;
626
627 let commands: Vec<C> = initial_events
628 .iter()
629 .flat_map(|event: &E| self.saga.compute_new_actions(event))
630 .collect();
631
632 let mut all_events = initial_events.clone();
634
635 for command in commands.iter() {
636 let previous_events = [
637 self.repository
638 .fetch_events(command)
639 .await?
640 .iter()
641 .map(|(e, _)| e.clone())
642 .collect::<Vec<E>>(),
643 initial_events
644 .clone()
645 .into_iter()
646 .filter(|e| e.identifier() == command.identifier())
647 .collect::<Vec<E>>(),
648 ]
649 .concat();
650
651 let new_events =
654 Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
655 all_events.extend(new_events);
656 }
657
658 Ok(all_events)
659 }
660}
661
662pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
677where
678 Repository: StateRepository<C, S, Version, Error>,
679{
680 repository: Repository,
681 decider: Decider<'a, C, S, E, Error>,
682 saga: Saga<'a, E, C>,
683 _marker: PhantomData<(C, S, E, Version, Error)>,
684}
685
686impl<C, S, E, Repository, Version, Error> StateComputation<C, S, E, Error>
687 for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
688where
689 Repository: StateRepository<C, S, Version, Error>,
690 S: Clone,
691{
692 fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
694 let effective_current_state =
695 current_state.unwrap_or_else(|| (self.decider.initial_state)());
696 let events = (self.decider.decide)(command, &effective_current_state)?;
697 let mut new_state = events.iter().fold(effective_current_state, |state, event| {
698 (self.decider.evolve)(&state, event)
699 });
700 let commands = events
701 .iter()
702 .flat_map(|event: &E| self.saga.compute_new_actions(event))
703 .collect::<Vec<C>>();
704 for action in commands {
705 new_state = self.compute_new_state(Some(new_state.clone()), &action)?;
706 }
707 Ok(new_state)
708 }
709}
710
711#[cfg(not(feature = "not-send-futures"))]
712impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
713 for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
714where
715 Repository: StateRepository<C, S, Version, Error> + Sync,
716 C: Sync,
717 S: Sync,
718 E: Sync,
719 Version: Sync,
720 Error: Sync,
721{
722 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
724 self.repository.fetch_state(command).await
725 }
726 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
728 self.repository.save(state, version).await
729 }
730}
731
732#[cfg(feature = "not-send-futures")]
733impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
734 for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
735where
736 Repository: StateRepository<C, S, Version, Error>,
737{
738 async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
740 self.repository.fetch_state(command).await
741 }
742 async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
744 self.repository.save(state, version).await
745 }
746}
747
748#[cfg(not(feature = "not-send-futures"))]
749impl<'a, C, S, E, Repository, Version, Error>
750 StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
751where
752 Repository: StateRepository<C, S, Version, Error> + Sync,
753 C: Sync,
754 S: Sync + Clone,
755 E: Sync,
756 Version: Sync,
757 Error: Sync,
758{
759 pub fn new(
761 repository: Repository,
762 decider: Decider<'a, C, S, E, Error>,
763 saga: Saga<'a, E, C>,
764 ) -> Self {
765 StateStoredOrchestratingAggregate {
766 repository,
767 decider,
768 saga,
769 _marker: PhantomData,
770 }
771 }
772 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
774 let state_version = self.fetch_state(command).await?;
775 match state_version {
776 None => {
777 let new_state = self.compute_new_state(None, command)?;
778 let saved_state = self.save(&new_state, &None).await?;
779 Ok(saved_state)
780 }
781 Some((state, version)) => {
782 let new_state = self.compute_new_state(Some(state), command)?;
783 let saved_state = self.save(&new_state, &Some(version)).await?;
784 Ok(saved_state)
785 }
786 }
787 }
788}
789
790#[cfg(feature = "not-send-futures")]
791impl<'a, C, S, E, Repository, Version, Error>
792 StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
793where
794 Repository: StateRepository<C, S, Version, Error>,
795 S: Clone,
796{
797 pub fn new(
799 repository: Repository,
800 decider: Decider<'a, C, S, E, Error>,
801 saga: Saga<'a, E, C>,
802 ) -> Self {
803 StateStoredOrchestratingAggregate {
804 repository,
805 decider,
806 saga,
807 _marker: PhantomData,
808 }
809 }
810 pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
812 let state_version = self.fetch_state(command).await?;
813 match state_version {
814 None => {
815 let new_state = self.compute_new_state(None, command)?;
816 let saved_state = self.save(&new_state, &None).await?;
817 Ok(saved_state)
818 }
819 Some((state, version)) => {
820 let new_state = self.compute_new_state(Some(state), command)?;
821 let saved_state = self.save(&new_state, &Some(version)).await?;
822 Ok(saved_state)
823 }
824 }
825 }
826}