use std::fmt::Debug;
use std::marker::Sized;
use chrono::{DateTime, Utc};
use timesource_core::TimesourceEventPayload;
use uuid::Uuid;
#[derive(Debug)]
pub struct UncommittedEvent<T> {
pub utc: DateTime<Utc>,
pub data: T,
}
impl<T> From<T> for UncommittedEvent<T> {
fn from(data: T) -> Self {
Self {
utc: Utc::now(),
data,
}
}
}
pub trait Aggregate {
type State;
type Event: TimesourceEventPayload + 'static + Send + Sync + std::fmt::Debug;
type Error: std::fmt::Debug + std::error::Error + Send + Sync + 'static;
type Command;
fn apply_first(
root_id: Uuid,
event: &Self::Event,
utc: DateTime<Utc>,
event_id: Option<u64>,
) -> Result<Self::State, Self::Error>;
fn apply_next(
state: &mut Self::State,
event: &Self::Event,
utc: DateTime<Utc>,
event_id: Option<u64>,
) -> Result<(), Self::Error>;
fn handle_first(command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>;
fn handle_next(
state: &Self::State,
command: Self::Command,
) -> Result<Vec<Self::Event>, Self::Error>;
fn root() -> AggregateRoot<Self>
where
Self: Sized,
{
Self::root_with_id(Uuid::new_v4())
}
fn root_with_id(id: Uuid) -> AggregateRoot<Self>
where
Self: Sized,
{
AggregateRoot {
id,
last_commit_utc: None,
state_utc: None,
state: None,
uncommitted_events: Default::default(),
}
}
fn root_with_state(id: Uuid, utc: DateTime<Utc>, state: Self::State) -> AggregateRoot<Self>
where
Self: Sized,
{
AggregateRoot {
id,
last_commit_utc: Some(utc),
state_utc: Some(utc),
state: Some(state),
uncommitted_events: Default::default(),
}
}
}
#[derive(Debug)]
pub struct AggregateRoot<A: Aggregate> {
id: Uuid,
last_commit_utc: Option<DateTime<Utc>>,
state_utc: Option<DateTime<Utc>>,
state: Option<A::State>,
uncommitted_events: Vec<UncommittedEvent<A::Event>>,
}
impl<A: Aggregate> PartialEq for AggregateRoot<A> {
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl<A: Aggregate> AggregateRoot<A>
where
A: Aggregate,
{
pub fn id(&self) -> Uuid {
self.id
}
pub fn state(&self) -> Option<&A::State> {
self.state.as_ref()
}
pub fn state_mut(&mut self) -> Option<&mut A::State> {
self.state.as_mut()
}
pub fn into_state(self) -> Option<A::State> {
self.state
}
pub fn state_timestamp(&self) -> Option<(&A::State, DateTime<Utc>)> {
match (&self.state, self.state_utc) {
(Some(state), Some(utc)) => Some((state, utc)),
_ => None,
}
}
pub fn into_state_timestamp(self) -> Option<(A::State, DateTime<Utc>)> {
match (self.state, self.state_utc) {
(Some(state), Some(utc)) => Some((state, utc)),
_ => None,
}
}
pub fn uncommitted_events(&self) -> &Vec<UncommittedEvent<A::Event>> {
self.uncommitted_events.as_ref()
}
pub fn last_commit_utc(&self) -> Option<DateTime<Utc>> {
self.last_commit_utc
}
pub fn handle(&mut self, command: A::Command) -> Result<&mut AggregateRoot<A>, A::Error> {
let next_events = match self.state.as_ref() {
Some(state) => A::handle_next(state, command)?
.into_iter()
.map(UncommittedEvent::from)
.collect::<Vec<_>>(),
None => A::handle_first(command)?
.into_iter()
.map(UncommittedEvent::from)
.collect::<Vec<_>>(),
};
for event in &next_events {
let data = &event.data;
let utc = event.utc;
if let Some(ref mut state) = self.state {
A::apply_next(state, data, utc, None)?;
} else {
let first_state = A::apply_first(self.id, data, utc, None)?;
self.state = Some(first_state);
}
}
self.state_utc = next_events.last().map(|e| e.utc);
self.uncommitted_events.extend(next_events);
Ok(self)
}
pub(crate) fn commit(&mut self) {
self.uncommitted_events.clear();
self.last_commit_utc = self.state_utc;
}
}