use std::future::Future;
use std::marker::PhantomData;
use crate::decider::{Decider, EventComputation, StateComputation};
use crate::saga::{ActionComputation, Saga};
pub trait EventRepository<C, E, Version, Error> {
fn fetch_events(
&self,
command: &C,
) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
}
pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec<E> {
self.decider.compute_new_events(current_events, command)
}
}
impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
self.repository.fetch_events(command).await
}
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events, latest_version).await
}
}
impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
pub fn new(repository: Repository, decider: Decider) -> Self {
EventSourcedAggregate {
repository,
decider,
_marker: PhantomData,
}
}
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut version: Option<Version> = None;
let mut current_events: Vec<E> = vec![];
for (event, ver) in events {
version = Some(ver);
current_events.push(event);
}
let new_events = self.compute_new_events(¤t_events, command);
let saved_events = self.save(&new_events, &version).await?;
Ok(saved_events)
}
}
pub trait StateRepository<C, S, Version, Error> {
fn fetch_state(
&self,
command: &C,
) -> impl Future<Output = Result<Option<(S, Version)>, Error>> + Send;
fn save(
&self,
state: &S,
version: &Option<Version>,
) -> impl Future<Output = Result<(S, Version), Error>> + Send;
}
pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
self.decider.compute_new_state(current_state, command)
}
}
impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
self.repository.fetch_state(command).await
}
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
self.repository.save(state, version).await
}
}
impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
pub fn new(repository: Repository, decider: Decider) -> Self {
StateStoredAggregate {
repository,
decider,
_marker: PhantomData,
}
}
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
}
}
pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
{
repository: Repository,
decider: Decider<'a, C, S, E>,
saga: Saga<'a, E, C>,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<'a, C, S, E, Repository, Version, Error> StateComputation<C, S, E>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
S: Clone,
{
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
let effective_current_state =
current_state.unwrap_or_else(|| (self.decider.initial_state)());
let events = (self.decider.decide)(command, &effective_current_state);
let mut new_state = events.iter().fold(effective_current_state, |state, event| {
(self.decider.evolve)(&state, event)
});
let commands = events
.iter()
.flat_map(|event: &E| self.saga.compute_new_actions(event))
.collect::<Vec<C>>();
commands.iter().for_each(|action| {
new_state = self.compute_new_state(Some(new_state.clone()), action);
});
new_state
}
}
impl<'a, C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
self.repository.fetch_state(command).await
}
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
self.repository.save(state, version).await
}
}
impl<'a, C, S, E, Repository, Version, Error>
StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
C: Sync,
S: Sync + Clone,
E: Sync,
Version: Sync,
Error: Sync,
{
pub fn new(
repository: Repository,
decider: Decider<'a, C, S, E>,
saga: Saga<'a, E, C>,
) -> Self {
StateStoredOrchestratingAggregate {
repository,
decider,
saga,
_marker: PhantomData,
}
}
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
}
}