use std::fmt::Debug;
use futures::stream::TryStreamExt;
use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
use crate::store::{EventStore, Expected, Select};
use crate::versioning::Versioned;
#[derive(Debug, thiserror::Error)]
pub enum Error<A, S>
where
A: std::error::Error + 'static,
S: std::error::Error + 'static,
{
#[error("failed to rebuild aggregate state: {0}")]
Aggregate(#[source] A),
#[error("event store failed: {0}")]
Store(#[source] S),
}
pub type Result<T, A, S> =
std::result::Result<T, Error<<A as Aggregate>::Error, <S as EventStore>::Error>>;
#[derive(Clone)]
pub struct Repository<T, Store>
where
T: Aggregate + Clone + 'static,
Store: EventStore<SourceId = T::Id, Event = T::Event>,
{
builder: AggregateRootBuilder<T>,
store: Store,
}
impl<T, Store> Repository<T, Store>
where
T: Aggregate + Clone,
Store: EventStore<SourceId = T::Id, Event = T::Event>,
{
#[inline]
pub fn new(builder: AggregateRootBuilder<T>, store: Store) -> Self {
Repository { builder, store }
}
}
impl<T, Store> Repository<T, Store>
where
T: Aggregate + Clone,
T::Id: Debug + Clone,
T::Event: Clone,
T::Error: std::error::Error + 'static,
Store: EventStore<SourceId = T::Id, Event = T::Event>,
Store::Error: std::error::Error + 'static,
{
#[cfg_attr(
feature = "with-tracing",
tracing::instrument(level = "info", name = "Repository::get", skip(self))
)]
pub async fn get(&self, id: T::Id) -> Result<AggregateRoot<T>, T, Store> {
self.store
.stream(id.clone(), Select::All)
.await
.map_err(Error::Store)?
.map_err(Error::Store)
.try_fold(
(0, T::State::default()),
|(version, state), event| async move {
let new_version = std::cmp::max(event.version(), version);
let state = T::apply(state, event.take()).map_err(Error::Aggregate)?;
Ok((new_version, state))
},
)
.await
.map(|(version, state)| self.builder.build_with_state(id, version, state))
}
#[cfg_attr(
feature = "with-tracing",
tracing::instrument(level = "info", name = "Repository::add", skip(self, root))
)]
pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> {
let mut version = root.version();
let events_to_commit = root.take_events_to_commit();
if let Some(events) = events_to_commit {
if !events.is_empty() {
version = self
.store
.append(root.id().clone(), Expected::Exact(version), events)
.await
.map_err(Error::Store)?;
}
}
Ok(root.with_version(version))
}
#[cfg_attr(
feature = "with-tracing",
tracing::instrument(level = "info", name = "Repository::remove", skip(self))
)]
pub async fn remove(&mut self, id: T::Id) -> Result<(), T, Store> {
self.store.remove(id).await.map_err(Error::Store)
}
}