1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
use std::marker::PhantomData;
use async_trait::async_trait;
use crate::decider::{EventComputation, StateComputation};
/// Event Repository trait
///
/// Generic parameters:
///
/// - `C` - Command
/// - `E` - Event
/// - `Version` - Version/Offset/Sequence number
/// - `Error` - Error
#[async_trait]
pub trait EventRepository<C, E, Version, Error> {
/// Fetches current events, based on the command.
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;
/// Saves events.
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error>;
}
/// Event Sourced Aggregate.
///
/// It is using a `Decider` / [EventComputation] to compute new events based on the current events and the command.
/// It is using a [EventRepository] to fetch the current events and to save the new events.
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - Event repository
/// - `Decider` - Event computation
/// - `Version` - Version/Offset/Sequence number
/// - `Error` - Error
pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
/// Creates a new instance of [EventSourcedAggregate].
pub fn new(repository: Repository, decider: Decider) -> Self {
EventSourcedAggregate {
repository,
decider,
_marker: PhantomData,
}
}
/// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.repository.fetch_events(command).await?;
let mut version: Option<Version> = None;
let mut current_events: Vec<E> = vec![];
for (event, ver) in events {
version = Some(ver);
current_events.push(event);
}
let new_events = self.decider.compute_new_events(¤t_events, command);
let saved_events = self.repository.save(&new_events, &version).await?;
Ok(saved_events)
}
}
/// State Repository trait
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `Version` - Version
/// - `Error` - Error
#[async_trait]
pub trait StateRepository<C, S, Version, Error> {
/// Fetches current state, based on the command.
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;
/// Saves state.
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;
}
/// State Stored Aggregate.
///
/// It is using a `Decider` / [StateComputation] to compute new state based on the current state and the command.
/// It is using a [StateRepository] to fetch the current state and to save the new state.
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - State repository
/// - `Decider` - State computation
/// - `Version` - Version
/// - `Error` - Error
pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
/// Creates a new instance of [StateStoredAggregate].
pub fn new(repository: Repository, decider: Decider) -> Self {
StateStoredAggregate {
repository,
decider,
_marker: PhantomData,
}
}
/// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.repository.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.decider.compute_new_state(None, command);
let saved_state = self.repository.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.decider.compute_new_state(Some(state), command);
let saved_state = self.repository.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
}
}