use std::error::Error as StdError;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use futures::future::{ok, BoxFuture, FutureExt};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use crate::store::{EventStore, Persisted, Select};
pub type EventStream<'a, S> = BoxStream<
'a,
Result<
Persisted<<S as EventSubscriber>::SourceId, <S as EventSubscriber>::Event>,
<S as EventSubscriber>::Error,
>,
>;
pub trait EventSubscriber {
type SourceId: Eq;
type Event;
type Error;
fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
}
pub type SubscriptionStream<'a, S> = BoxStream<
'a,
Result<
Persisted<<S as Subscription>::SourceId, <S as Subscription>::Event>,
<S as Subscription>::Error,
>,
>;
pub trait Subscription {
type SourceId: Eq;
type Event;
type Error;
fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>>;
fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>>;
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error received while listening to the event stream from the store: {0}")]
Store(#[source] anyhow::Error),
#[error("error received while listening to the event stream subscription: {0}")]
Subscription(#[source] anyhow::Error),
}
pub struct Transient<Store, Subscriber> {
store: Store,
subscriber: Subscriber,
last_sequence_number: Arc<AtomicU32>,
}
impl<Store, Subscriber> Transient<Store, Subscriber> {
pub fn new(store: Store, subscriber: Subscriber) -> Self {
Self {
store,
subscriber,
last_sequence_number: Default::default(),
}
}
pub fn from(self, sequence_number: u32) -> Self {
self.last_sequence_number
.store(sequence_number, Ordering::Relaxed);
self
}
}
impl<Store, Subscriber> Subscription for Transient<Store, Subscriber>
where
Store: EventStore + Send + Sync,
Subscriber: EventSubscriber<
SourceId = <Store as EventStore>::SourceId,
Event = <Store as EventStore>::Event,
> + Send
+ Sync,
<Store as EventStore>::SourceId: Send + Sync,
<Store as EventStore>::Event: Send + Sync,
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
{
type SourceId = Store::SourceId;
type Event = Store::Event;
type Error = Error;
fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
Box::pin(async move {
let subscription = self
.subscriber
.subscribe_all()
.await
.map_err(anyhow::Error::from)
.map_err(Error::Store)?;
let one_off_stream = self
.store
.stream_all(Select::From(
self.last_sequence_number.load(Ordering::Relaxed),
))
.await
.map_err(anyhow::Error::from)
.map_err(Error::Subscription)?;
let stream = one_off_stream
.map_err(anyhow::Error::from)
.map_err(Error::Store)
.chain(
subscription
.map_err(anyhow::Error::from)
.map_err(Error::Subscription),
)
.try_filter_map(move |event| async move {
let expected_sequence_number =
self.last_sequence_number.load(Ordering::Relaxed);
let event_sequence_number = event.sequence_number();
if event_sequence_number < expected_sequence_number {
#[cfg(feature = "with-tracing")]
tracing::trace!(
event.sequence_number = event_sequence_number,
subscription.sequence_number = expected_sequence_number,
"Duplicated event detected; skipping"
);
return Ok(None);
}
Ok(Some(event))
})
.boxed();
Ok(stream)
})
}
fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
self.last_sequence_number.store(version, Ordering::Relaxed);
ok(()).boxed()
}
}