use std::pin::pin;
use futures::{Stream, StreamExt, TryStreamExt};
use uuid::Uuid;
use super::future::IntoSendFuture;
use super::{Event, EventGroup, Sequence};
use crate::aggregate::{Aggregate, Root};
use crate::envelope;
use crate::error::{self, Error};
use crate::project::{Context, Project};
use crate::version::DeserializeVersion;
#[trait_variant::make(Send)]
pub trait Replay {
type Envelope: envelope::Envelope;
async fn replay<G: EventGroup>(
&self,
first_sequence: Sequence,
) -> error::Result<impl Stream<Item = error::Result<Self::Envelope>> + Send>;
}
#[trait_variant::make(Send)]
pub trait ReplayOne {
type Envelope: envelope::Envelope;
async fn replay_one<E: Event>(
&self,
id: Uuid,
first_sequence: Sequence,
) -> error::Result<impl Stream<Item = error::Result<Self::Envelope>> + Send>;
}
#[trait_variant::make(Send)]
pub trait ReplayExt: Replay {
async fn rebuild<P>(&self, projector: P) -> error::Result<()>
where
P: for<'de> Project<'de>;
async fn rebuild_after<P>(&self, projector: P, first_sequence: Sequence) -> error::Result<()>
where
P: for<'de> Project<'de>;
}
#[trait_variant::make(Send)]
pub trait ReplayOneExt: ReplayOne {
async fn read<A>(&self, id: Uuid) -> error::Result<Root<A>>
where
A: Aggregate,
A::Event: for<'de> DeserializeVersion<'de>;
async fn read_after<A>(&self, root: Root<A>) -> error::Result<Root<A>>
where
A: Aggregate,
A::Event: for<'de> DeserializeVersion<'de>;
}
impl<T> ReplayExt for T
where
T: Replay + Sync,
T::Envelope: Sync,
{
async fn rebuild<P>(&self, projector: P) -> error::Result<()>
where
P: for<'de> Project<'de>,
{
self.rebuild_after(projector, Sequence::new()).await
}
async fn rebuild_after<P>(
&self,
mut projector: P,
first_sequence: Sequence,
) -> error::Result<()>
where
P: for<'de> Project<'de>,
{
let mut stream = pin!(self.replay::<P::EventGroup>(first_sequence).await?);
while let Some(envelope) = stream.next().await {
let envelope = envelope?;
let context = Context::try_with_envelope(&envelope)?;
projector
.project(context)
.into_send_future()
.await
.map_err(|e| Error::External(e.into()))?;
}
Ok(())
}
}
impl<T> ReplayOneExt for T
where
T: ReplayOne + Sync,
{
async fn read<A>(&self, id: Uuid) -> error::Result<Root<A>>
where
A: Aggregate,
A::Event: for<'de> DeserializeVersion<'de>,
{
self.read_after(Root::new(id)).await
}
async fn read_after<A>(&self, root: Root<A>) -> error::Result<Root<A>>
where
A: Aggregate,
A::Event: for<'de> DeserializeVersion<'de>,
{
self.replay_one::<A::Event>(Root::id(&root), Root::last_sequence(&root))
.await?
.try_fold(root, |r, e| async move { Root::try_apply(r, e) })
.await
}
}