use std::marker::PhantomData;
use async_trait::async_trait;
use crate::decider::{EventComputation, StateComputation};
#[async_trait]
pub trait EventRepository<C, E, Version, Error> {
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error>;
}
pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
{
fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec<E> {
self.decider.compute_new_events(current_events, command)
}
}
#[async_trait]
impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version, Error>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
self.repository.fetch_events(command).await
}
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events, latest_version).await
}
}
impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
pub fn new(repository: Repository, decider: Decider) -> Self {
EventSourcedAggregate {
repository,
decider,
_marker: PhantomData,
}
}
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut version: Option<Version> = None;
let mut current_events: Vec<E> = vec![];
for (event, ver) in events {
version = Some(ver);
current_events.push(event);
}
let new_events = self.compute_new_events(¤t_events, command);
let saved_events = self.save(&new_events, &version).await?;
Ok(saved_events)
}
}
#[async_trait]
pub trait StateRepository<C, S, Version, Error> {
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;
}
pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}
impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
{
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
self.decider.compute_new_state(current_state, command)
}
}
#[async_trait]
impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
self.repository.fetch_state(command).await
}
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
self.repository.save(state, version).await
}
}
impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
pub fn new(repository: Repository, decider: Decider) -> Self {
StateStoredAggregate {
repository,
decider,
_marker: PhantomData,
}
}
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
}
}