use std::sync::Arc;
use futures::future::BoxFuture;
pub trait Identifiable {
type Id: Eq;
fn id(&self) -> Self::Id;
}
impl<State> Identifiable for Option<State>
where
State: Identifiable,
State::Id: Default,
{
type Id = State::Id;
#[inline]
fn id(&self) -> Self::Id {
self.as_ref().map_or_else(Self::Id::default, |id| id.id())
}
}
pub type AggregateId<A> = <<A as Aggregate>::State as Identifiable>::Id;
pub trait Aggregate {
type State: Identifiable;
type Event;
type Command;
type Error;
fn apply(state: Self::State, event: Self::Event) -> Result<Self::State, Self::Error>;
fn handle<'a, 's: 'a>(
&'a self,
state: &'s Self::State,
command: Self::Command,
) -> BoxFuture<'a, Result<Vec<Self::Event>, Self::Error>>
where
Self: Sized;
}
impl<T> Aggregate for Arc<T>
where
T: Aggregate,
{
type State = T::State;
type Event = T::Event;
type Command = T::Command;
type Error = T::Error;
fn apply(state: Self::State, event: Self::Event) -> Result<Self::State, Self::Error> {
T::apply(state, event)
}
fn handle<'agg, 'st: 'agg>(
&'agg self,
state: &'st Self::State,
command: Self::Command,
) -> BoxFuture<'agg, Result<Vec<Self::Event>, Self::Error>>
where
Self: Sized,
{
T::handle(self, state, command)
}
}
pub trait AggregateExt: Aggregate {
#[inline]
fn root(&self) -> AggregateRoot<Self>
where
Self: Sized + Clone,
Self::State: Default,
{
AggregateRoot::from(self.clone())
}
#[inline]
fn fold<I>(state: Self::State, mut events: I) -> Result<Self::State, Self::Error>
where
I: Iterator<Item = Self::Event>,
{
events.try_fold(state, Self::apply)
}
}
impl<T> AggregateExt for T where T: Aggregate {}
#[derive(Debug)]
pub struct AggregateRoot<T>
where
T: Aggregate + 'static,
{
pub(crate) state: T::State,
aggregate: T,
pub(crate) to_commit: Option<Vec<T::Event>>,
}
impl<T> Identifiable for AggregateRoot<T>
where
T: Aggregate,
{
type Id = AggregateId<T>;
#[inline]
fn id(&self) -> Self::Id {
self.state.id()
}
}
impl<T> PartialEq for AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl<T> From<T> for AggregateRoot<T>
where
T: Aggregate,
T::State: Default,
{
#[inline]
fn from(aggregate: T) -> Self {
Self::new(aggregate, Default::default())
}
}
impl<T> AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
pub fn state(&self) -> &T::State {
&self.state
}
}
impl<T> AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
pub fn new(aggregate: T, state: T::State) -> Self {
Self {
state,
aggregate,
to_commit: None,
}
}
}
impl<T> AggregateRoot<T>
where
T: AggregateExt,
T::Event: Clone,
T::State: Clone,
{
pub async fn handle(&mut self, command: T::Command) -> Result<&mut Self, T::Error> {
let mut events = self.aggregate.handle(self.state(), command).await?;
self.state = T::fold(self.state.clone(), events.clone().into_iter())?;
self.to_commit = Some(match self.to_commit.take() {
None => events,
Some(mut list) => {
list.append(&mut events);
list
}
});
Ok(self)
}
}