fmodel_rust/
aggregate.rs

1use std::future::Future;
2use std::marker::PhantomData;
3
4use crate::decider::{Decider, EventComputation, StateComputation};
5use crate::saga::{ActionComputation, Saga};
6use crate::Identifier;
7
8/// Event Repository trait
9///
10/// Generic parameters:
11///
12/// - `C` - Command
13/// - `E` - Event
14/// - `Version` - Version/Offset/Sequence number
15/// - `Error` - Error
16pub trait EventRepository<C, E, Version, Error> {
17    /// Fetches current events, based on the command.
18    /// Desugared `async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`.
19    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
20    fn fetch_events(
21        &self,
22        command: &C,
23    ) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
24    /// Saves events.
25    /// Desugared `async fn save(&self, events: &[E], latest_version: &Option<Version>) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
26    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
27    fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
28
29    /// Version provider. It is used to provide the version/sequence of the stream to wich this event belongs to. Optimistic locking is useing this version to check if the event is already saved.
30    /// Desugared `async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
31    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
32    fn version_provider(
33        &self,
34        event: &E,
35    ) -> impl Future<Output = Result<Option<Version>, Error>> + Send;
36}
37
38/// Event Sourced Aggregate.
39///
40/// It is using a `Decider` / [EventComputation] to compute new events based on the current events and the command.
41/// It is using a [EventRepository] to fetch the current events and to save the new events.
42///
43/// Generic parameters:
44///
45/// - `C` - Command
46/// - `S` - State
47/// - `E` - Event
48/// - `Repository` - Event repository
49/// - `Decider` - Event computation
50/// - `Version` - Version/Offset/Sequence number
51/// - `Error` - Error
52pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
53where
54    Repository: EventRepository<C, E, Version, Error>,
55    Decider: EventComputation<C, S, E, Error>,
56{
57    repository: Repository,
58    decider: Decider,
59    _marker: PhantomData<(C, S, E, Version, Error)>,
60}
61
62impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E, Error>
63    for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
64where
65    Repository: EventRepository<C, E, Version, Error>,
66    Decider: EventComputation<C, S, E, Error>,
67{
68    /// Computes new events based on the current events and the command.
69    fn compute_new_events(&self, current_events: &[E], command: &C) -> Result<Vec<E>, Error> {
70        self.decider.compute_new_events(current_events, command)
71    }
72}
73
74impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
75    for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
76where
77    Repository: EventRepository<C, E, Version, Error> + Sync,
78    Decider: EventComputation<C, S, E, Error> + Sync,
79    C: Sync,
80    S: Sync,
81    E: Sync,
82    Version: Sync,
83    Error: Sync,
84{
85    /// Fetches current events, based on the command.
86    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
87        self.repository.fetch_events(command).await
88    }
89    /// Saves events.
90    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
91        self.repository.save(events).await
92    }
93    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
94    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
95        self.repository.version_provider(event).await
96    }
97}
98
99impl<C, S, E, Repository, Decider, Version, Error>
100    EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
101where
102    Repository: EventRepository<C, E, Version, Error> + Sync,
103    Decider: EventComputation<C, S, E, Error> + Sync,
104    C: Sync,
105    S: Sync,
106    E: Sync,
107    Version: Sync,
108    Error: Sync,
109{
110    /// Creates a new instance of [EventSourcedAggregate].
111    pub fn new(repository: Repository, decider: Decider) -> Self {
112        EventSourcedAggregate {
113            repository,
114            decider,
115            _marker: PhantomData,
116        }
117    }
118    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
119    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
120        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
121        let mut current_events: Vec<E> = vec![];
122        for (event, _) in events {
123            current_events.push(event);
124        }
125        let new_events = self.compute_new_events(&current_events, command)?;
126        let saved_events = self.save(&new_events).await?;
127        Ok(saved_events)
128    }
129}
130
131/// State Repository trait
132///
133/// Generic parameters:
134///
135/// - `C` - Command
136/// - `S` - State
137/// - `Version` - Version
138/// - `Error` - Error
139pub trait StateRepository<C, S, Version, Error> {
140    /// Fetches current state, based on the command.
141    /// Desugared `async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;` to a normal `fn` that returns `impl Future` and adds bound `Send`
142    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
143    fn fetch_state(
144        &self,
145        command: &C,
146    ) -> impl Future<Output = Result<Option<(S, Version)>, Error>> + Send;
147    /// Saves state.
148    /// Desugared `async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;` to a normal `fn` that returns `impl Future` and adds bound `Send`
149    /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
150    fn save(
151        &self,
152        state: &S,
153        version: &Option<Version>,
154    ) -> impl Future<Output = Result<(S, Version), Error>> + Send;
155}
156
157/// State Stored Aggregate.
158///
159/// It is using a `Decider` / [StateComputation] to compute new state based on the current state and the command.
160/// It is using a [StateRepository] to fetch the current state and to save the new state.
161///
162/// Generic parameters:
163///
164/// - `C` - Command
165/// - `S` - State
166/// - `E` - Event
167/// - `Repository` - State repository
168/// - `Decider` - State computation
169/// - `Version` - Version
170/// - `Error` - Error
171pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
172where
173    Repository: StateRepository<C, S, Version, Error>,
174    Decider: StateComputation<C, S, E, Error>,
175{
176    repository: Repository,
177    decider: Decider,
178    _marker: PhantomData<(C, S, E, Version, Error)>,
179}
180
181impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E, Error>
182    for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
183where
184    Repository: StateRepository<C, S, Version, Error>,
185    Decider: StateComputation<C, S, E, Error>,
186{
187    /// Computes new state based on the current state and the command.
188    fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
189        self.decider.compute_new_state(current_state, command)
190    }
191}
192
193impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
194    for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
195where
196    Repository: StateRepository<C, S, Version, Error> + Sync,
197    Decider: StateComputation<C, S, E, Error> + Sync,
198    C: Sync,
199    S: Sync,
200    E: Sync,
201    Version: Sync,
202    Error: Sync,
203{
204    /// Fetches current state, based on the command.
205    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
206        self.repository.fetch_state(command).await
207    }
208    /// Saves state.
209    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
210        self.repository.save(state, version).await
211    }
212}
213
214impl<C, S, E, Repository, Decider, Version, Error>
215    StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
216where
217    Repository: StateRepository<C, S, Version, Error> + Sync,
218    Decider: StateComputation<C, S, E, Error> + Sync,
219    C: Sync,
220    S: Sync,
221    E: Sync,
222    Version: Sync,
223    Error: Sync,
224{
225    /// Creates a new instance of [StateStoredAggregate].
226    pub fn new(repository: Repository, decider: Decider) -> Self {
227        StateStoredAggregate {
228            repository,
229            decider,
230            _marker: PhantomData,
231        }
232    }
233    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
234    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
235        let state_version = self.fetch_state(command).await?;
236        match state_version {
237            None => {
238                let new_state = self.compute_new_state(None, command)?;
239                let saved_state = self.save(&new_state, &None).await?;
240                Ok(saved_state)
241            }
242            Some((state, version)) => {
243                let new_state = self.compute_new_state(Some(state), command)?;
244                let saved_state = self.save(&new_state, &Some(version)).await?;
245                Ok(saved_state)
246            }
247        }
248    }
249}
250
251/// Orchestrating Event Sourced Aggregate.
252/// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
253/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
254/// It is using a [EventRepository] to fetch the current events and to save the new events.
255/// Generic parameters:
256/// - `C` - Command
257/// - `S` - State
258/// - `E` - Event
259/// - `Repository` - Event repository
260/// - `Version` - Version/Offset/Sequence number
261/// - `Error` - Error
262pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
263where
264    Repository: EventRepository<C, E, Version, Error>,
265{
266    repository: Repository,
267    decider: Decider<'a, C, S, E, Error>,
268    saga: Saga<'a, E, C>,
269    _marker: PhantomData<(C, S, E, Version, Error)>,
270}
271
272impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
273    for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
274where
275    Repository: EventRepository<C, E, Version, Error> + Sync,
276    C: Sync,
277    S: Sync,
278    E: Sync,
279    Version: Sync,
280    Error: Sync,
281{
282    /// Fetches current events, based on the command.
283    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
284        self.repository.fetch_events(command).await
285    }
286    /// Saves events.
287    async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
288        self.repository.save(events).await
289    }
290    /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
291    async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
292        self.repository.version_provider(event).await
293    }
294}
295
296impl<'a, C, S, E, Repository, Version, Error>
297    EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
298where
299    Repository: EventRepository<C, E, Version, Error> + Sync,
300    C: Sync,
301    S: Sync,
302    E: Sync + Clone,
303    Version: Sync,
304    Error: Sync,
305{
306    /// Creates a new instance of [EventSourcedAggregate].
307    pub fn new(
308        repository: Repository,
309        decider: Decider<'a, C, S, E, Error>,
310        saga: Saga<'a, E, C>,
311    ) -> Self {
312        EventSourcedOrchestratingAggregate {
313            repository,
314            decider,
315            saga,
316            _marker: PhantomData,
317        }
318    }
319    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
320    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
321    where
322        E: Identifier,
323        C: Identifier,
324    {
325        let events: Vec<(E, Version)> = self.fetch_events(command).await?;
326        let mut current_events: Vec<E> = vec![];
327        for (event, _) in events {
328            current_events.push(event);
329        }
330        let new_events = self
331            .compute_new_events_dynamically(&current_events, command)
332            .await?;
333        let saved_events = self.save(&new_events).await?;
334        Ok(saved_events)
335    }
336    /// Computes new events based on the current events and the command.
337    /// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
338    /// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
339    /// It is using a [EventRepository] to fetch the current events for the command that is computed by the `saga`.
340    async fn compute_new_events_dynamically(
341        &self,
342        current_events: &[E],
343        command: &C,
344    ) -> Result<Vec<E>, Error>
345    where
346        E: Identifier,
347        C: Identifier,
348    {
349        let current_state: S = current_events
350            .iter()
351            .fold((self.decider.initial_state)(), |state, event| {
352                (self.decider.evolve)(&state, event)
353            });
354
355        let initial_events = (self.decider.decide)(command, &current_state)?;
356
357        let commands: Vec<C> = initial_events
358            .iter()
359            .flat_map(|event: &E| self.saga.compute_new_actions(event))
360            .collect();
361
362        // Collect all events including recursively computed new events.
363        let mut all_events = initial_events.clone();
364
365        for command in commands.iter() {
366            let previous_events = [
367                self.repository
368                    .fetch_events(command)
369                    .await?
370                    .iter()
371                    .map(|(e, _)| e.clone())
372                    .collect::<Vec<E>>(),
373                initial_events
374                    .clone()
375                    .into_iter()
376                    .filter(|e| e.identifier() == command.identifier())
377                    .collect::<Vec<E>>(),
378            ]
379            .concat();
380
381            // Recursively compute new events and extend the accumulated events list.
382            // By wrapping the recursive call in a Box, we ensure that the future type is not self-referential.
383            let new_events =
384                Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
385            all_events.extend(new_events);
386        }
387
388        Ok(all_events)
389    }
390}
391
392/// Orchestrating State Stored Aggregate.
393///
394/// It is using a [Decider] and [Saga] to compute new state based on the current state and the command.
395/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
396/// It is using a [StateRepository] to fetch the current state and to save the new state.
397///
398/// Generic parameters:
399///
400/// - `C` - Command
401/// - `S` - State
402/// - `E` - Event
403/// - `Repository` - State repository
404/// - `Version` - Version
405/// - `Error` - Error
406pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
407where
408    Repository: StateRepository<C, S, Version, Error>,
409{
410    repository: Repository,
411    decider: Decider<'a, C, S, E, Error>,
412    saga: Saga<'a, E, C>,
413    _marker: PhantomData<(C, S, E, Version, Error)>,
414}
415
416impl<C, S, E, Repository, Version, Error> StateComputation<C, S, E, Error>
417    for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
418where
419    Repository: StateRepository<C, S, Version, Error>,
420    S: Clone,
421{
422    /// Computes new state based on the current state and the command.
423    fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
424        let effective_current_state =
425            current_state.unwrap_or_else(|| (self.decider.initial_state)());
426        let events = (self.decider.decide)(command, &effective_current_state)?;
427        let mut new_state = events.iter().fold(effective_current_state, |state, event| {
428            (self.decider.evolve)(&state, event)
429        });
430        let commands = events
431            .iter()
432            .flat_map(|event: &E| self.saga.compute_new_actions(event))
433            .collect::<Vec<C>>();
434        for action in commands {
435            new_state = self.compute_new_state(Some(new_state.clone()), &action)?;
436        }
437        Ok(new_state)
438    }
439}
440
441impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
442    for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
443where
444    Repository: StateRepository<C, S, Version, Error> + Sync,
445    C: Sync,
446    S: Sync,
447    E: Sync,
448    Version: Sync,
449    Error: Sync,
450{
451    /// Fetches current state, based on the command.
452    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
453        self.repository.fetch_state(command).await
454    }
455    /// Saves state.
456    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
457        self.repository.save(state, version).await
458    }
459}
460
461impl<'a, C, S, E, Repository, Version, Error>
462    StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
463where
464    Repository: StateRepository<C, S, Version, Error> + Sync,
465    C: Sync,
466    S: Sync + Clone,
467    E: Sync,
468    Version: Sync,
469    Error: Sync,
470{
471    /// Creates a new instance of [StateStoredAggregate].
472    pub fn new(
473        repository: Repository,
474        decider: Decider<'a, C, S, E, Error>,
475        saga: Saga<'a, E, C>,
476    ) -> Self {
477        StateStoredOrchestratingAggregate {
478            repository,
479            decider,
480            saga,
481            _marker: PhantomData,
482        }
483    }
484    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
485    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
486        let state_version = self.fetch_state(command).await?;
487        match state_version {
488            None => {
489                let new_state = self.compute_new_state(None, command)?;
490                let saved_state = self.save(&new_state, &None).await?;
491                Ok(saved_state)
492            }
493            Some((state, version)) => {
494                let new_state = self.compute_new_state(Some(state), command)?;
495                let saved_state = self.save(&new_state, &Some(version)).await?;
496                Ok(saved_state)
497            }
498        }
499    }
500}