timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
//! Contains the [Repository pattern] implementation for [`AggregateRoot`]
//! instances.
//!
//! Check out [`Repository`] for more information.
//!
//! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern

use std::marker::PhantomData;

use chrono::Utc;
use futures::stream::TryStreamExt;
use uuid::Uuid;

use crate::aggregate::{Aggregate, AggregateRoot};
use crate::error::{Error, Result};
use crate::store::{CommitOrder, EventStore};

#[derive(Debug, Clone)]
pub struct Repository<A, S>
where
    A: Aggregate + 'static,
    S: EventStore<Event = A::Event>,
{
    aggregate: PhantomData<A>,
    store: S,
}

impl<A, S> Repository<A, S>
where
    A: Aggregate,
    S: EventStore<Event = A::Event>,
{
    pub fn new(store: S) -> Self {
        Repository {
            aggregate: PhantomData,
            store,
        }
    }

    pub async fn get(&self, aggregate_id: Uuid) -> Result<AggregateRoot<A>> {
        let (utc, state) = self
            .store
            .aggregate_stream(aggregate_id)
            .await
            .try_fold(
                (Utc::now(), None),
                |(_, mut fold_state), event| async move {
                    let utc = event.utc();
                    let aggregate_id = event.aggregate_id();
                    let id = event.id();
                    let data = &event.into_data();
                    let state = match fold_state {
                        None => match A::apply_first(aggregate_id, data, utc, Some(id)) {
                            Ok(state) => Some(state),
                            Err(error) => return Err(Error::Aggregate(Box::new(error))),
                        },
                        Some(ref mut state) => {
                            if let Err(error) = A::apply_next(state, data, utc, Some(id)) {
                                return Err(Error::Aggregate(Box::new(error)));
                            };

                            fold_state
                        }
                    };

                    Ok((utc, state))
                },
            )
            .await?;

        match state {
            Some(state) => Ok(A::root_with_state(aggregate_id, utc, state)),
            None => Err(Error::AggregateRootNotFound),
        }
    }

    pub async fn commit_orderly(&self, root: &mut AggregateRoot<A>) -> Result<Vec<u64>> {
        let events = root.uncommitted_events();

        if !events.is_empty() {
            let order = match root.last_commit_utc() {
                Some(utc) => CommitOrder::Following(utc),
                None => CommitOrder::First,
            };

            let ids = self.store.commit(root.id(), order, events).await?;
            root.commit();
            Ok(ids)
        } else {
            Ok(vec![])
        }
    }

    pub async fn commit_unorderly(&self, root: &mut AggregateRoot<A>) -> Result<Vec<u64>> {
        let events = root.uncommitted_events();

        if !events.is_empty() {
            let ids = self
                .store
                .commit(root.id(), CommitOrder::None, events)
                .await?;
            root.commit();
            Ok(ids)
        } else {
            Ok(vec![])
        }
    }

    pub async fn remove(&mut self, aggregate_id: Uuid) -> Result<()> {
        self.store.remove(aggregate_id).await
    }
}