timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
//! Foundation traits for creating Domain abstractions
//! using [the `Aggregate` pattern](https://martinfowler.com/bliki/DDD_Aggregate.html).

use std::fmt::Debug;
use std::marker::Sized;

use chrono::{DateTime, Utc};
use timesource_core::TimesourceEventPayload;
use uuid::Uuid;

#[derive(Debug)]
pub struct UncommittedEvent<T> {
    pub utc: DateTime<Utc>,
    pub data: T,
}

impl<T> From<T> for UncommittedEvent<T> {
    fn from(data: T) -> Self {
        Self {
            utc: Utc::now(),
            data,
        }
    }
}

/// An [`Aggregate`] manages a domain entity [`State`](Aggregate::State), acting
/// as a _transaction boundary_.
///
/// It allows **state mutations** through the use of
/// [`Command`](Aggregate::Command)s, which the Aggregate instance handles and
/// emits a number of Domain [`Event`](Aggregate::Event)s.
pub trait Aggregate {
    /// State of the Aggregate: this should represent the Domain Entity data
    /// structure.
    type State;

    /// Represents a specific, domain-related change to the Aggregate
    /// [`State`](Aggregate::State).
    type Event: TimesourceEventPayload + 'static + Send + Sync + std::fmt::Debug;

    /// Possible failures while [`apply`](Aggregate::apply)ing
    /// [`Event`](Aggregate::Event)s or handling
    /// [`Command`](Aggregate::Command)s.
    type Error: std::fmt::Debug + std::error::Error + Send + Sync + 'static;

    /// Commands are all the possible operations available on an Aggregate.
    /// Use Commands to model business use-cases or [`State`](Aggregate::State)
    /// mutations.
    type Command;

    /// Initialise state of root aggregate
    /// * `root_id` - the aggregate root id.
    /// * `event` - event data
    /// * `utc` - DateTime<Utc> of the event
    /// * `event_id` - if of the event. It will always be `Some(u64)` on reads, but `None` when committing
    fn apply_first(
        root_id: Uuid,
        event: &Self::Event,
        utc: DateTime<Utc>,
        event_id: Option<u64>,
    ) -> Result<Self::State, Self::Error>;

    /// Apply Self::Event to the current state
    /// * `state` - folded state so far.
    /// * `event` - event data
    /// * `utc` - DateTime<Utc> of the event
    /// * `event_id` - if of the event. It will always be `Some(u64)` on reads, but `None` when committing
    fn apply_next(
        state: &mut Self::State,
        event: &Self::Event,
        utc: DateTime<Utc>,
        event_id: Option<u64>,
    ) -> Result<(), Self::Error>;

    /// Transform the constructor Command into Vec<Self::Event>
    fn handle_first(command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>;

    /// Transform a Command into Vec<Self::Event>
    fn handle_next(
        state: &Self::State,
        command: Self::Command,
    ) -> Result<Vec<Self::Event>, Self::Error>;

    fn root() -> AggregateRoot<Self>
    where
        Self: Sized,
    {
        Self::root_with_id(Uuid::new_v4())
    }

    fn root_with_id(id: Uuid) -> AggregateRoot<Self>
    where
        Self: Sized,
    {
        AggregateRoot {
            id,
            last_commit_utc: None,
            state_utc: None,
            state: None,
            uncommitted_events: Default::default(),
        }
    }

    fn root_with_state(id: Uuid, utc: DateTime<Utc>, state: Self::State) -> AggregateRoot<Self>
    where
        Self: Sized,
    {
        AggregateRoot {
            id,
            last_commit_utc: Some(utc),
            state_utc: Some(utc),
            state: Some(state),
            uncommitted_events: Default::default(),
        }
    }
}

#[derive(Debug)]
pub struct AggregateRoot<A: Aggregate> {
    id: Uuid,
    last_commit_utc: Option<DateTime<Utc>>,
    state_utc: Option<DateTime<Utc>>,
    state: Option<A::State>,
    uncommitted_events: Vec<UncommittedEvent<A::Event>>,
}

impl<A: Aggregate> PartialEq for AggregateRoot<A> {
    fn eq(&self, other: &Self) -> bool {
        self.id() == other.id()
    }
}

impl<A: Aggregate> AggregateRoot<A>
where
    A: Aggregate,
{
    pub fn id(&self) -> Uuid {
        self.id
    }

    pub fn state(&self) -> Option<&A::State> {
        self.state.as_ref()
    }

    pub fn state_mut(&mut self) -> Option<&mut A::State> {
        self.state.as_mut()
    }

    pub fn into_state(self) -> Option<A::State> {
        self.state
    }

    pub fn state_timestamp(&self) -> Option<(&A::State, DateTime<Utc>)> {
        match (&self.state, self.state_utc) {
            (Some(state), Some(utc)) => Some((state, utc)),
            _ => None,
        }
    }

    pub fn into_state_timestamp(self) -> Option<(A::State, DateTime<Utc>)> {
        match (self.state, self.state_utc) {
            (Some(state), Some(utc)) => Some((state, utc)),
            _ => None,
        }
    }

    pub fn uncommitted_events(&self) -> &Vec<UncommittedEvent<A::Event>> {
        self.uncommitted_events.as_ref()
    }

    pub fn last_commit_utc(&self) -> Option<DateTime<Utc>> {
        self.last_commit_utc
    }

    pub fn handle(&mut self, command: A::Command) -> Result<&mut AggregateRoot<A>, A::Error> {
        let next_events = match self.state.as_ref() {
            Some(state) => A::handle_next(state, command)?
                .into_iter()
                .map(UncommittedEvent::from)
                .collect::<Vec<_>>(),
            None => A::handle_first(command)?
                .into_iter()
                .map(UncommittedEvent::from)
                .collect::<Vec<_>>(),
        };

        for event in &next_events {
            // update state
            let data = &event.data;
            let utc = event.utc;

            if let Some(ref mut state) = self.state {
                A::apply_next(state, data, utc, None)?;
            } else {
                let first_state = A::apply_first(self.id, data, utc, None)?;
                self.state = Some(first_state);
            }
        }

        self.state_utc = next_events.last().map(|e| e.utc);
        self.uncommitted_events.extend(next_events);

        Ok(self)
    }

    pub(crate) fn commit(&mut self) {
        self.uncommitted_events.clear();
        self.last_commit_utc = self.state_utc;
    }
}