use std::marker::PhantomData;
use chrono::Utc;
use futures::stream::TryStreamExt;
use uuid::Uuid;
use crate::aggregate::{Aggregate, AggregateRoot};
use crate::error::{Error, Result};
use crate::store::{CommitOrder, EventStore};
#[derive(Debug, Clone)]
pub struct Repository<A, S>
where
A: Aggregate + 'static,
S: EventStore<Event = A::Event>,
{
aggregate: PhantomData<A>,
store: S,
}
impl<A, S> Repository<A, S>
where
A: Aggregate,
S: EventStore<Event = A::Event>,
{
pub fn new(store: S) -> Self {
Repository {
aggregate: PhantomData,
store,
}
}
pub async fn get(&self, aggregate_id: Uuid) -> Result<AggregateRoot<A>> {
let (utc, state) = self
.store
.aggregate_stream(aggregate_id)
.await
.try_fold(
(Utc::now(), None),
|(_, mut fold_state), event| async move {
let utc = event.utc();
let aggregate_id = event.aggregate_id();
let id = event.id();
let data = &event.into_data();
let state = match fold_state {
None => match A::apply_first(aggregate_id, data, utc, Some(id)) {
Ok(state) => Some(state),
Err(error) => return Err(Error::Aggregate(Box::new(error))),
},
Some(ref mut state) => {
if let Err(error) = A::apply_next(state, data, utc, Some(id)) {
return Err(Error::Aggregate(Box::new(error)));
};
fold_state
}
};
Ok((utc, state))
},
)
.await?;
match state {
Some(state) => Ok(A::root_with_state(aggregate_id, utc, state)),
None => Err(Error::AggregateRootNotFound),
}
}
pub async fn commit_orderly(&self, root: &mut AggregateRoot<A>) -> Result<Vec<u64>> {
let events = root.uncommitted_events();
if !events.is_empty() {
let order = match root.last_commit_utc() {
Some(utc) => CommitOrder::Following(utc),
None => CommitOrder::First,
};
let ids = self.store.commit(root.id(), order, events).await?;
root.commit();
Ok(ids)
} else {
Ok(vec![])
}
}
pub async fn commit_unorderly(&self, root: &mut AggregateRoot<A>) -> Result<Vec<u64>> {
let events = root.uncommitted_events();
if !events.is_empty() {
let ids = self
.store
.commit(root.id(), CommitOrder::None, events)
.await?;
root.commit();
Ok(ids)
} else {
Ok(vec![])
}
}
pub async fn remove(&mut self, aggregate_id: Uuid) -> Result<()> {
self.store.remove(aggregate_id).await
}
}