use std::fmt::Debug;
use std::ops::Deref;
use futures::future::BoxFuture;
#[cfg(feature = "serde")]
use serde::Serialize;
use crate::versioning::Versioned;
pub type AggregateId<A> = <A as Aggregate>::Id;
pub trait Aggregate {
type Id: Eq;
type State: Default;
type Event;
type Command;
type Error;
fn apply(state: Self::State, event: Self::Event) -> Result<Self::State, Self::Error>;
fn handle<'a, 's: 'a>(
&'a self,
id: &'s Self::Id,
state: &'s Self::State,
command: Self::Command,
) -> BoxFuture<'a, Result<Option<Vec<Self::Event>>, Self::Error>>
where
Self: Sized;
}
pub trait AggregateExt: Aggregate {
#[inline]
fn fold<I>(state: Self::State, mut events: I) -> Result<Self::State, Self::Error>
where
I: Iterator<Item = Self::Event>,
{
events.try_fold(state, Self::apply)
}
}
impl<T> AggregateExt for T where T: Aggregate {}
#[derive(Clone)]
pub struct AggregateRootBuilder<T>
where
T: Aggregate,
{
aggregate: T,
}
impl<T> From<T> for AggregateRootBuilder<T>
where
T: Aggregate,
{
#[inline]
fn from(aggregate: T) -> Self {
Self { aggregate }
}
}
impl<T> AggregateRootBuilder<T>
where
T: Aggregate + Clone,
{
#[inline]
pub fn build(&self, id: T::Id) -> AggregateRoot<T> {
self.build_with_state(id, 0, Default::default())
}
#[inline]
pub fn build_with_state(&self, id: T::Id, version: u32, state: T::State) -> AggregateRoot<T> {
AggregateRoot {
id,
version,
state,
aggregate: self.aggregate.clone(),
to_commit: None,
}
}
}
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Serialize))]
pub struct AggregateRoot<T>
where
T: Aggregate + 'static,
{
id: T::Id,
version: u32,
#[cfg_attr(feature = "serde", serde(flatten))]
state: T::State,
#[cfg_attr(feature = "serde", serde(skip_serializing))]
aggregate: T,
#[cfg_attr(feature = "serde", serde(skip_serializing))]
to_commit: Option<Vec<T::Event>>,
}
impl<T> PartialEq for AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl<T> Versioned for AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
fn version(&self) -> u32 {
self.version
}
}
impl<T> AggregateRoot<T>
where
T: Aggregate,
{
#[inline]
pub fn id(&self) -> &T::Id {
&self.id
}
#[inline]
pub fn state(&self) -> &T::State {
&self.state
}
#[inline]
pub(crate) fn take_events_to_commit(&mut self) -> Option<Vec<T::Event>> {
std::mem::replace(&mut self.to_commit, None)
}
#[inline]
pub(crate) fn with_version(mut self, version: u32) -> Self {
self.version = version;
self
}
}
impl<T> Deref for AggregateRoot<T>
where
T: Aggregate,
{
type Target = T::State;
fn deref(&self) -> &Self::Target {
self.state()
}
}
impl<T> AggregateRoot<T>
where
T: Aggregate,
T::Event: Clone,
T::State: Clone,
T::Command: Debug,
{
#[cfg_attr(
feature = "with-tracing",
tracing::instrument(level = "debug", name = "AggregateRoot::handle", skip(self))
)]
pub async fn handle(&mut self, command: T::Command) -> Result<&mut Self, T::Error> {
let events = self
.aggregate
.handle(self.id(), self.state(), command)
.await?;
if let Some(mut events) = events {
self.state = T::fold(self.state.clone(), events.clone().into_iter())?;
self.to_commit = Some(match self.to_commit.take() {
None => events,
Some(mut list) => {
list.append(&mut events);
list
}
});
}
Ok(self)
}
}