use std::error::Error as StdError;
use std::fmt::Debug;
use futures::future;
use futures::stream::TryStreamExt;
use thiserror::Error as ThisError;
use crate::aggregate::{Aggregate, AggregateId, AggregateRoot, Identifiable};
use crate::store::EventStore;
#[derive(Debug, ThisError, PartialEq, Eq)]
pub enum Error<A, S>
where
A: Aggregate + Debug,
S: EventStore + Debug,
A::Error: StdError + 'static,
S::Error: StdError + 'static,
{
#[error("failed to rebuild aggregate state: {0}")]
Aggregate(#[source] A::Error),
#[error("event store failed: {0}")]
Store(#[source] S::Error),
#[error("no events to commit")]
NoEvents,
}
pub type Result<T, A, S> = std::result::Result<T, Error<A, S>>;
pub struct Repository<T, Store>
where
T: Aggregate + 'static,
{
aggregate: T,
store: Store,
}
impl<T, Store> Repository<T, Store>
where
T: Aggregate,
Store: EventStore<SourceId = AggregateId<T>, Event = T::Event>,
{
#[inline]
pub fn new(aggregate: T, store: Store) -> Self {
Repository { aggregate, store }
}
}
impl<T, Store> Repository<T, Store>
where
T: Aggregate + Debug + Clone,
T::Event: Clone,
T::State: Default + Identifiable,
T::Error: StdError + 'static,
AggregateId<T>: Default,
Store: EventStore<SourceId = AggregateId<T>, Event = T::Event> + Debug,
Store::Offset: Default,
Store::Error: StdError + 'static,
{
pub async fn get(&self, id: AggregateId<T>) -> Result<Option<AggregateRoot<T>>, T, Store> {
Ok(self
.store
.stream(id, Store::Offset::default())
.await
.map_err(Error::Store)?
.map_err(Error::Store)
.try_fold(None, |state, event| {
let state = state.unwrap_or_else(T::State::default);
future::ready(T::apply(state, event).map(Some).map_err(Error::Aggregate))
})
.await?
.map(|state| AggregateRoot::new(self.aggregate.clone(), state)))
}
pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> {
if root.to_commit.is_none() {
return Err(Error::NoEvents);
}
self.store
.append(root.id(), root.to_commit.unwrap())
.await
.map_err(Error::Store)?;
root.to_commit = None;
Ok(root)
}
pub async fn remove(&mut self, id: AggregateId<T>) -> Result<(), T, Store> {
self.store.remove(id).await.map_err(Error::Store)
}
}