use std::pin::pin;
use futures::{Stream, StreamExt};
use super::future::IntoSendFuture;
use super::EventGroup;
use crate::envelope;
use crate::error::{self, Error};
use crate::project::{Context, Project};
#[trait_variant::make(Send)]
pub trait Subscribe {
type Envelope: envelope::Envelope;
async fn subscribe<G: EventGroup>(
&self,
) -> error::Result<impl Stream<Item = error::Result<Self::Envelope>> + Send>;
}
#[trait_variant::make(Send)]
pub trait SubscribeExt: Subscribe {
async fn observe<P>(&self, projector: P) -> error::Result<()>
where
P: for<'de> Project<'de>;
}
impl<T> SubscribeExt for T
where
T: Subscribe + Sync,
T::Envelope: Sync,
{
async fn observe<P>(&self, mut projector: P) -> error::Result<()>
where
P: for<'de> Project<'de>,
{
let mut stream = pin!(self.subscribe::<P::EventGroup>().await?);
while let Some(envelope) = stream.next().await {
let envelope = envelope?;
let context = Context::try_with_envelope(&envelope)?;
projector
.project(context)
.into_send_future()
.await
.map_err(|e| Error::External(e.into()))?;
}
Ok(())
}
}