fmodel_rust/
aggregate.rs

1use std::future::Future;
2use std::marker::PhantomData;
3
4use crate::decider::{Decider, EventComputation, StateComputation};
5use crate::saga::{ActionComputation, Saga};
6use crate::Identifier;
7
8/// Event Repository trait
9///
10/// Generic parameters:
11///
12/// - `C` - Command
13/// - `E` - Event
14/// - `Version` - Version/Offset/Sequence number
15/// - `Error` - Error
16#[cfg(not(feature = "not-send-futures"))]
17pub trait EventRepository<C, E, Version, Error> {
18    /// Fetches current events, based on the command.
19    /// Desugared `async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`.
20    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
21    fn fetch_events(
22        &self,
23        command: &C,
24    ) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
25    /// Saves events.
26    /// Desugared `async fn save(&self, events: &[E], latest_version: &Option<Version>) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
27    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
28    fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
29
30    /// Version provider. It is used to provide the version/sequence of the stream to wich this event belongs to. Optimistic locking is useing this version to check if the event is already saved.
31    /// Desugared `async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
32    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
33    fn version_provider(
34        &self,
35        event: &E,
36    ) -> impl Future<Output = Result<Option<Version>, Error>> + Send;
37}
38
39/// Event Repository trait
40///
41/// Generic parameters:
42///
43/// - `C` - Command
44/// - `E` - Event
45/// - `Version` - Version/Offset/Sequence number
46/// - `Error` - Error
47#[cfg(feature = "not-send-futures")]
48pub trait EventRepository<C, E, Version, Error> {
49    /// Fetches current events, based on the command.
50    /// Desugared `async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`.
51    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls.
52    fn fetch_events(&self, command: &C) -> impl Future<Output = Result<Vec<(E, Version)>, Error>>;
53    /// Saves events.
54    /// Desugared `async fn save(&self, events: &[E], latest_version: &Option<Version>) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`
55    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls.
56    fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>>;
57
58    /// Version provider. It is used to provide the version/sequence of the stream to wich this event belongs to. Optimistic locking is useing this version to check if the event is already saved.
59    /// Desugared `async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error>;` to a normal `fn` that returns `impl Future`
60    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls.
61    fn version_provider(&self, event: &E) -> impl Future<Output = Result<Option<Version>, Error>>;
62}
63
64/// Event Sourced Aggregate.
65///
66/// It is using a `Decider` / [EventComputation] to compute new events based on the current events and the command.
67/// It is using a [EventRepository] to fetch the current events and to save the new events.
68///
69/// Generic parameters:
70///
71/// - `C` - Command
72/// - `S` - State
73/// - `E` - Event
74/// - `Repository` - Event repository
75/// - `Decider` - Event computation
76/// - `Version` - Version/Offset/Sequence number
77/// - `Error` - Error
78pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
79where
80    Repository: EventRepository<C, E, Version, Error>,
81    Decider: EventComputation<C, S, E, Error>,
82{
83    repository: Repository,
84    decider: Decider,
85    _marker: PhantomData<(C, S, E, Version, Error)>,
86}
87
88impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E, Error>
89    for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
90where
91    Repository: EventRepository<C, E, Version, Error>,
92    Decider: EventComputation<C, S, E, Error>,
93{
94    /// Computes new events based on the current events and the command.
95    fn compute_new_events(&self, current_events: &[E], command: &C) -> Result<Vec<E>, Error> {
96        self.decider.compute_new_events(current_events, command)
97    }
98}
99
100#[cfg(not(feature = "not-send-futures"))]
101impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
102    for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
103where
104    Repository: EventRepository<C, E, Version, Error> + Sync,
105    Decider: EventComputation<C, S, E, Error> + Sync,
106    C: Sync,
107    S: Sync,
108    E: Sync,
109    Version: Sync,
110    Error: Sync,
111{
112    /// Fetches current events, based on the command.
113    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
114        self.repository.fetch_events(command).await
115    }
116    /// Saves events.
117    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
118        self.repository.save(events).await
119    }
120    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
121    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
122        self.repository.version_provider(event).await
123    }
124}
125
126#[cfg(feature = "not-send-futures")]
127impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
128    for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
129where
130    Repository: EventRepository<C, E, Version, Error>,
131    Decider: EventComputation<C, S, E, Error>,
132{
133    /// Fetches current events, based on the command.
134    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
135        self.repository.fetch_events(command).await
136    }
137    /// Saves events.
138    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
139        self.repository.save(events).await
140    }
141    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
142    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
143        self.repository.version_provider(event).await
144    }
145}
146
147#[cfg(not(feature = "not-send-futures"))]
148impl<C, S, E, Repository, Decider, Version, Error>
149    EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
150where
151    Repository: EventRepository<C, E, Version, Error> + Sync,
152    Decider: EventComputation<C, S, E, Error> + Sync,
153    C: Sync,
154    S: Sync,
155    E: Sync,
156    Version: Sync,
157    Error: Sync,
158{
159    /// Creates a new instance of [EventSourcedAggregate].
160    pub fn new(repository: Repository, decider: Decider) -> Self {
161        EventSourcedAggregate {
162            repository,
163            decider,
164            _marker: PhantomData,
165        }
166    }
167    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
168    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
169        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
170        let mut current_events: Vec<E> = vec![];
171        for (event, _) in events {
172            current_events.push(event);
173        }
174        let new_events = self.compute_new_events(&current_events, command)?;
175        let saved_events = self.save(&new_events).await?;
176        Ok(saved_events)
177    }
178}
179
180#[cfg(feature = "not-send-futures")]
181impl<C, S, E, Repository, Decider, Version, Error>
182    EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
183where
184    Repository: EventRepository<C, E, Version, Error>,
185    Decider: EventComputation<C, S, E, Error>,
186{
187    /// Creates a new instance of [EventSourcedAggregate].
188    pub fn new(repository: Repository, decider: Decider) -> Self {
189        EventSourcedAggregate {
190            repository,
191            decider,
192            _marker: PhantomData,
193        }
194    }
195    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
196    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
197        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
198        let mut current_events: Vec<E> = vec![];
199        for (event, _) in events {
200            current_events.push(event);
201        }
202        let new_events = self.compute_new_events(&current_events, command)?;
203        let saved_events = self.save(&new_events).await?;
204        Ok(saved_events)
205    }
206}
207
208/// State Repository trait
209///
210/// Generic parameters:
211///
212/// - `C` - Command
213/// - `S` - State
214/// - `Version` - Version
215/// - `Error` - Error
216#[cfg(not(feature = "not-send-futures"))]
217pub trait StateRepository<C, S, Version, Error> {
218    /// Fetches current state, based on the command.
219    /// Desugared `async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;` to a normal `fn` that returns `impl Future` and adds bound `Send`
220    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
221    fn fetch_state(
222        &self,
223        command: &C,
224    ) -> impl Future<Output = Result<Option<(S, Version)>, Error>> + Send;
225    /// Saves state.
226    /// Desugared `async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;` to a normal `fn` that returns `impl Future` and adds bound `Send`
227    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
228    fn save(
229        &self,
230        state: &S,
231        version: &Option<Version>,
232    ) -> impl Future<Output = Result<(S, Version), Error>> + Send;
233}
234
235/// State Repository trait
236///
237/// Generic parameters:
238///
239/// - `C` - Command
240/// - `S` - State
241/// - `Version` - Version
242/// - `Error` - Error
243#[cfg(feature = "not-send-futures")]
244pub trait StateRepository<C, S, Version, Error> {
245    /// Fetches current state, based on the command.
246    /// Desugared `async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;` to a normal `fn` that returns `impl Future`
247    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls.
248    fn fetch_state(&self, command: &C)
249        -> impl Future<Output = Result<Option<(S, Version)>, Error>>;
250    /// Saves state.
251    /// Desugared `async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;` to a normal `fn` that returns `impl Future`
252    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls.
253    fn save(
254        &self,
255        state: &S,
256        version: &Option<Version>,
257    ) -> impl Future<Output = Result<(S, Version), Error>>;
258}
259
260/// State Stored Aggregate.
261///
262/// It is using a `Decider` / [StateComputation] to compute new state based on the current state and the command.
263/// It is using a [StateRepository] to fetch the current state and to save the new state.
264///
265/// Generic parameters:
266///
267/// - `C` - Command
268/// - `S` - State
269/// - `E` - Event
270/// - `Repository` - State repository
271/// - `Decider` - State computation
272/// - `Version` - Version
273/// - `Error` - Error
274pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
275where
276    Repository: StateRepository<C, S, Version, Error>,
277    Decider: StateComputation<C, S, E, Error>,
278{
279    repository: Repository,
280    decider: Decider,
281    _marker: PhantomData<(C, S, E, Version, Error)>,
282}
283
284impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E, Error>
285    for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
286where
287    Repository: StateRepository<C, S, Version, Error>,
288    Decider: StateComputation<C, S, E, Error>,
289{
290    /// Computes new state based on the current state and the command.
291    fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
292        self.decider.compute_new_state(current_state, command)
293    }
294}
295
296#[cfg(not(feature = "not-send-futures"))]
297impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
298    for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
299where
300    Repository: StateRepository<C, S, Version, Error> + Sync,
301    Decider: StateComputation<C, S, E, Error> + Sync,
302    C: Sync,
303    S: Sync,
304    E: Sync,
305    Version: Sync,
306    Error: Sync,
307{
308    /// Fetches current state, based on the command.
309    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
310        self.repository.fetch_state(command).await
311    }
312    /// Saves state.
313    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
314        self.repository.save(state, version).await
315    }
316}
317
318#[cfg(feature = "not-send-futures")]
319impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
320    for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
321where
322    Repository: StateRepository<C, S, Version, Error>,
323    Decider: StateComputation<C, S, E, Error>,
324{
325    /// Fetches current state, based on the command.
326    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
327        self.repository.fetch_state(command).await
328    }
329    /// Saves state.
330    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
331        self.repository.save(state, version).await
332    }
333}
334
335#[cfg(not(feature = "not-send-futures"))]
336impl<C, S, E, Repository, Decider, Version, Error>
337    StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
338where
339    Repository: StateRepository<C, S, Version, Error> + Sync,
340    Decider: StateComputation<C, S, E, Error> + Sync,
341    C: Sync,
342    S: Sync,
343    E: Sync,
344    Version: Sync,
345    Error: Sync,
346{
347    /// Creates a new instance of [StateStoredAggregate].
348    pub fn new(repository: Repository, decider: Decider) -> Self {
349        StateStoredAggregate {
350            repository,
351            decider,
352            _marker: PhantomData,
353        }
354    }
355    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
356    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
357        let state_version = self.fetch_state(command).await?;
358        match state_version {
359            None => {
360                let new_state = self.compute_new_state(None, command)?;
361                let saved_state = self.save(&new_state, &None).await?;
362                Ok(saved_state)
363            }
364            Some((state, version)) => {
365                let new_state = self.compute_new_state(Some(state), command)?;
366                let saved_state = self.save(&new_state, &Some(version)).await?;
367                Ok(saved_state)
368            }
369        }
370    }
371}
372
373#[cfg(feature = "not-send-futures")]
374impl<C, S, E, Repository, Decider, Version, Error>
375    StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
376where
377    Repository: StateRepository<C, S, Version, Error>,
378    Decider: StateComputation<C, S, E, Error>,
379{
380    /// Creates a new instance of [StateStoredAggregate].
381    pub fn new(repository: Repository, decider: Decider) -> Self {
382        StateStoredAggregate {
383            repository,
384            decider,
385            _marker: PhantomData,
386        }
387    }
388    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
389    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
390        let state_version = self.fetch_state(command).await?;
391        match state_version {
392            None => {
393                let new_state = self.compute_new_state(None, command)?;
394                let saved_state = self.save(&new_state, &None).await?;
395                Ok(saved_state)
396            }
397            Some((state, version)) => {
398                let new_state = self.compute_new_state(Some(state), command)?;
399                let saved_state = self.save(&new_state, &Some(version)).await?;
400                Ok(saved_state)
401            }
402        }
403    }
404}
405
406/// Orchestrating Event Sourced Aggregate.
407/// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
408/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
409/// It is using a [EventRepository] to fetch the current events and to save the new events.
410/// Generic parameters:
411/// - `C` - Command
412/// - `S` - State
413/// - `E` - Event
414/// - `Repository` - Event repository
415/// - `Version` - Version/Offset/Sequence number
416/// - `Error` - Error
417pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
418where
419    Repository: EventRepository<C, E, Version, Error>,
420{
421    repository: Repository,
422    decider: Decider<'a, C, S, E, Error>,
423    saga: Saga<'a, E, C>,
424    _marker: PhantomData<(C, S, E, Version, Error)>,
425}
426
427#[cfg(not(feature = "not-send-futures"))]
428impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
429    for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
430where
431    Repository: EventRepository<C, E, Version, Error> + Sync,
432    C: Sync,
433    S: Sync,
434    E: Sync,
435    Version: Sync,
436    Error: Sync,
437{
438    /// Fetches current events, based on the command.
439    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
440        self.repository.fetch_events(command).await
441    }
442    /// Saves events.
443    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
444        self.repository.save(events).await
445    }
446    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
447    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
448        self.repository.version_provider(event).await
449    }
450}
451
452#[cfg(feature = "not-send-futures")]
453impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
454    for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
455where
456    Repository: EventRepository<C, E, Version, Error>,
457{
458    /// Fetches current events, based on the command.
459    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
460        self.repository.fetch_events(command).await
461    }
462    /// Saves events.
463    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
464        self.repository.save(events).await
465    }
466    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
467    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
468        self.repository.version_provider(event).await
469    }
470}
471
472#[cfg(not(feature = "not-send-futures"))]
473impl<'a, C, S, E, Repository, Version, Error>
474    EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
475where
476    Repository: EventRepository<C, E, Version, Error> + Sync,
477    C: Sync,
478    S: Sync,
479    E: Sync + Clone,
480    Version: Sync,
481    Error: Sync,
482{
483    /// Creates a new instance of [EventSourcedAggregate].
484    pub fn new(
485        repository: Repository,
486        decider: Decider<'a, C, S, E, Error>,
487        saga: Saga<'a, E, C>,
488    ) -> Self {
489        EventSourcedOrchestratingAggregate {
490            repository,
491            decider,
492            saga,
493            _marker: PhantomData,
494        }
495    }
496    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
497    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
498    where
499        E: Identifier,
500        C: Identifier,
501    {
502        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
503        let mut current_events: Vec<E> = vec![];
504        for (event, _) in events {
505            current_events.push(event);
506        }
507        let new_events = self
508            .compute_new_events_dynamically(&current_events, command)
509            .await?;
510        let saved_events = self.save(&new_events).await?;
511        Ok(saved_events)
512    }
513    /// Computes new events based on the current events and the command.
514    /// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
515    /// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
516    /// It is using a [EventRepository] to fetch the current events for the command that is computed by the `saga`.
517    async fn compute_new_events_dynamically(
518        &self,
519        current_events: &[E],
520        command: &C,
521    ) -> Result<Vec<E>, Error>
522    where
523        E: Identifier,
524        C: Identifier,
525    {
526        let current_state: S = current_events
527            .iter()
528            .fold((self.decider.initial_state)(), |state, event| {
529                (self.decider.evolve)(&state, event)
530            });
531
532        let initial_events = (self.decider.decide)(command, &current_state)?;
533
534        let commands: Vec<C> = initial_events
535            .iter()
536            .flat_map(|event: &E| self.saga.compute_new_actions(event))
537            .collect();
538
539        // Collect all events including recursively computed new events.
540        let mut all_events = initial_events.clone();
541
542        for command in commands.iter() {
543            let previous_events = [
544                self.repository
545                    .fetch_events(command)
546                    .await?
547                    .iter()
548                    .map(|(e, _)| e.clone())
549                    .collect::<Vec<E>>(),
550                initial_events
551                    .clone()
552                    .into_iter()
553                    .filter(|e| e.identifier() == command.identifier())
554                    .collect::<Vec<E>>(),
555            ]
556            .concat();
557
558            // Recursively compute new events and extend the accumulated events list.
559            // By wrapping the recursive call in a Box, we ensure that the future type is not self-referential.
560            let new_events =
561                Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
562            all_events.extend(new_events);
563        }
564
565        Ok(all_events)
566    }
567}
568
569#[cfg(feature = "not-send-futures")]
570impl<'a, C, S, E, Repository, Version, Error>
571    EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
572where
573    Repository: EventRepository<C, E, Version, Error>,
574    E: Clone,
575{
576    /// Creates a new instance of [EventSourcedAggregate].
577    pub fn new(
578        repository: Repository,
579        decider: Decider<'a, C, S, E, Error>,
580        saga: Saga<'a, E, C>,
581    ) -> Self {
582        EventSourcedOrchestratingAggregate {
583            repository,
584            decider,
585            saga,
586            _marker: PhantomData,
587        }
588    }
589    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
590    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
591    where
592        E: Identifier,
593        C: Identifier,
594    {
595        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
596        let mut current_events: Vec<E> = vec![];
597        for (event, _) in events {
598            current_events.push(event);
599        }
600        let new_events = self
601            .compute_new_events_dynamically(&current_events, command)
602            .await?;
603        let saved_events = self.save(&new_events).await?;
604        Ok(saved_events)
605    }
606    /// Computes new events based on the current events and the command.
607    /// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
608    /// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
609    /// It is using a [EventRepository] to fetch the current events for the command that is computed by the `saga`.
610    async fn compute_new_events_dynamically(
611        &self,
612        current_events: &[E],
613        command: &C,
614    ) -> Result<Vec<E>, Error>
615    where
616        E: Identifier,
617        C: Identifier,
618    {
619        let current_state: S = current_events
620            .iter()
621            .fold((self.decider.initial_state)(), |state, event| {
622                (self.decider.evolve)(&state, event)
623            });
624
625        let initial_events = (self.decider.decide)(command, &current_state)?;
626
627        let commands: Vec<C> = initial_events
628            .iter()
629            .flat_map(|event: &E| self.saga.compute_new_actions(event))
630            .collect();
631
632        // Collect all events including recursively computed new events.
633        let mut all_events = initial_events.clone();
634
635        for command in commands.iter() {
636            let previous_events = [
637                self.repository
638                    .fetch_events(command)
639                    .await?
640                    .iter()
641                    .map(|(e, _)| e.clone())
642                    .collect::<Vec<E>>(),
643                initial_events
644                    .clone()
645                    .into_iter()
646                    .filter(|e| e.identifier() == command.identifier())
647                    .collect::<Vec<E>>(),
648            ]
649            .concat();
650
651            // Recursively compute new events and extend the accumulated events list.
652            // By wrapping the recursive call in a Box, we ensure that the future type is not self-referential.
653            let new_events =
654                Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
655            all_events.extend(new_events);
656        }
657
658        Ok(all_events)
659    }
660}
661
662/// Orchestrating State Stored Aggregate.
663///
664/// It is using a [Decider] and [Saga] to compute new state based on the current state and the command.
665/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
666/// It is using a [StateRepository] to fetch the current state and to save the new state.
667///
668/// Generic parameters:
669///
670/// - `C` - Command
671/// - `S` - State
672/// - `E` - Event
673/// - `Repository` - State repository
674/// - `Version` - Version
675/// - `Error` - Error
676pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
677where
678    Repository: StateRepository<C, S, Version, Error>,
679{
680    repository: Repository,
681    decider: Decider<'a, C, S, E, Error>,
682    saga: Saga<'a, E, C>,
683    _marker: PhantomData<(C, S, E, Version, Error)>,
684}
685
686impl<C, S, E, Repository, Version, Error> StateComputation<C, S, E, Error>
687    for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
688where
689    Repository: StateRepository<C, S, Version, Error>,
690    S: Clone,
691{
692    /// Computes new state based on the current state and the command.
693    fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
694        let effective_current_state =
695            current_state.unwrap_or_else(|| (self.decider.initial_state)());
696        let events = (self.decider.decide)(command, &effective_current_state)?;
697        let mut new_state = events.iter().fold(effective_current_state, |state, event| {
698            (self.decider.evolve)(&state, event)
699        });
700        let commands = events
701            .iter()
702            .flat_map(|event: &E| self.saga.compute_new_actions(event))
703            .collect::<Vec<C>>();
704        for action in commands {
705            new_state = self.compute_new_state(Some(new_state.clone()), &action)?;
706        }
707        Ok(new_state)
708    }
709}
710
711#[cfg(not(feature = "not-send-futures"))]
712impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
713    for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
714where
715    Repository: StateRepository<C, S, Version, Error> + Sync,
716    C: Sync,
717    S: Sync,
718    E: Sync,
719    Version: Sync,
720    Error: Sync,
721{
722    /// Fetches current state, based on the command.
723    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
724        self.repository.fetch_state(command).await
725    }
726    /// Saves state.
727    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
728        self.repository.save(state, version).await
729    }
730}
731
732#[cfg(feature = "not-send-futures")]
733impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
734    for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
735where
736    Repository: StateRepository<C, S, Version, Error>,
737{
738    /// Fetches current state, based on the command.
739    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
740        self.repository.fetch_state(command).await
741    }
742    /// Saves state.
743    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
744        self.repository.save(state, version).await
745    }
746}
747
748#[cfg(not(feature = "not-send-futures"))]
749impl<'a, C, S, E, Repository, Version, Error>
750    StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
751where
752    Repository: StateRepository<C, S, Version, Error> + Sync,
753    C: Sync,
754    S: Sync + Clone,
755    E: Sync,
756    Version: Sync,
757    Error: Sync,
758{
759    /// Creates a new instance of [StateStoredAggregate].
760    pub fn new(
761        repository: Repository,
762        decider: Decider<'a, C, S, E, Error>,
763        saga: Saga<'a, E, C>,
764    ) -> Self {
765        StateStoredOrchestratingAggregate {
766            repository,
767            decider,
768            saga,
769            _marker: PhantomData,
770        }
771    }
772    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
773    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
774        let state_version = self.fetch_state(command).await?;
775        match state_version {
776            None => {
777                let new_state = self.compute_new_state(None, command)?;
778                let saved_state = self.save(&new_state, &None).await?;
779                Ok(saved_state)
780            }
781            Some((state, version)) => {
782                let new_state = self.compute_new_state(Some(state), command)?;
783                let saved_state = self.save(&new_state, &Some(version)).await?;
784                Ok(saved_state)
785            }
786        }
787    }
788}
789
790#[cfg(feature = "not-send-futures")]
791impl<'a, C, S, E, Repository, Version, Error>
792    StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
793where
794    Repository: StateRepository<C, S, Version, Error>,
795    S: Clone,
796{
797    /// Creates a new instance of [StateStoredAggregate].
798    pub fn new(
799        repository: Repository,
800        decider: Decider<'a, C, S, E, Error>,
801        saga: Saga<'a, E, C>,
802    ) -> Self {
803        StateStoredOrchestratingAggregate {
804            repository,
805            decider,
806            saga,
807            _marker: PhantomData,
808        }
809    }
810    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
811    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
812        let state_version = self.fetch_state(command).await?;
813        match state_version {
814            None => {
815                let new_state = self.compute_new_state(None, command)?;
816                let saved_state = self.save(&new_state, &None).await?;
817                Ok(saved_state)
818            }
819            Some((state, version)) => {
820                let new_state = self.compute_new_state(Some(state), command)?;
821                let saved_state = self.save(&new_state, &Some(version)).await?;
822                Ok(saved_state)
823            }
824        }
825    }
826}