use std::convert::{TryFrom, TryInto};
use std::error::Error as StdError;
use std::fmt::{Debug, Display};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::stream::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio_postgres::Client;
use eventually_core::store::{EventStore as EventStoreTrait, Select};
use eventually_core::subscription::{
EventSubscriber as EventSubscriberTrait, Subscription, SubscriptionStream,
};
use crate::store::{Error as EventStoreError, EventStore};
use crate::subscriber::{DeserializeError, EventSubscriber};
use crate::Params;
const GET_OR_CREATE_SUBSCRIPTION: &str = "SELECT * FROM get_or_create_subscription($1, $2)";
const CHECKPOINT_SUBSCRIPTION: &str = "SELECT * FROM checkpoint_subscription($1, $2, $3)";
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error detected while reading one-off event stream from the store: {0}")]
Store(#[source] EventStoreError),
#[error("error detected while reading catch-up event stream from the subscription: {0}")]
Subscriber(#[source] DeserializeError),
#[error("failed to checkpoint persistent subscription version: {0}")]
Checkpoint(#[source] tokio_postgres::Error),
}
pub struct PersistentBuilder<SourceId, Event> {
client: Arc<Client>,
store: EventStore<SourceId, Event>,
subscriber: EventSubscriber<SourceId, Event>,
}
impl<SourceId, Event> PersistentBuilder<SourceId, Event>
where
SourceId: Clone,
Event: Clone,
{
pub fn new(
client: Arc<Client>,
store: EventStore<SourceId, Event>,
subscriber: EventSubscriber<SourceId, Event>,
) -> Self {
Self {
client,
store,
subscriber,
}
}
pub async fn get_or_create(
&self,
name: String,
) -> Result<Persistent<SourceId, Event>, tokio_postgres::Error> {
let params: Params = &[&name, &self.store.type_name];
let row = self
.client
.query_one(GET_OR_CREATE_SUBSCRIPTION, params)
.await?;
let last_sequence_number: i64 = row.try_get("last_sequence_number")?;
Ok(Persistent {
name,
last_sequence_number: AtomicI64::from(last_sequence_number),
client: self.client.clone(),
store: self.store.clone(),
subscriber: self.subscriber.clone(),
})
}
}
pub struct Persistent<SourceId, Event> {
name: String,
last_sequence_number: AtomicI64,
client: Arc<Client>,
store: EventStore<SourceId, Event>,
subscriber: EventSubscriber<SourceId, Event>,
}
impl<SourceId, Event> Subscription for Persistent<SourceId, Event>
where
SourceId: TryFrom<String> + Display + Eq + Clone + Send + Sync,
Event: Serialize + Clone + Send + Sync + Debug,
for<'de> Event: Deserialize<'de>,
<SourceId as TryFrom<String>>::Error: StdError + Send + Sync + 'static,
{
type SourceId = SourceId;
type Event = Event;
type Error = Error;
fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
let fut = async move {
let last_sequence_number = self.last_sequence_number.load(Ordering::Relaxed);
let checkpoint: u32 = (last_sequence_number + 1).try_into().expect(
"in case of overflow, it means there is a bug in the optimistic versioning code; \\
please open an issue with steps to reproduce the bug",
);
#[cfg(feature = "with-tracing")]
tracing::trace!(
subscription.sequence_number = last_sequence_number,
subscription.checkpoint = checkpoint,
subscription.name = %self.name,
subscription.aggregate_type = %self.store.type_name,
"Resuming persistent subscription"
);
let subscription = self
.subscriber
.subscribe_all()
.await
.map_err(Error::Subscriber)?;
let one_off_stream = self
.store
.stream_all(Select::From(checkpoint))
.await
.map_err(Error::Store)?;
let stream = one_off_stream
.map_err(Error::Store)
.chain(subscription.map_err(Error::Subscriber))
.try_filter_map(move |event| async move {
let event_sequence_number = event.sequence_number() as i64;
let expected_sequence_number =
self.last_sequence_number.load(Ordering::Relaxed);
if event_sequence_number <= expected_sequence_number {
#[cfg(feature = "with-tracing")]
tracing::trace!(
event.sequence_number = event_sequence_number,
subscription.sequence_number = expected_sequence_number,
"Duplicated event detected; skipping"
);
return Ok(None);
}
Ok(Some(event))
})
.boxed();
Ok(stream)
};
Box::pin(fut)
}
fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
Box::pin(async move {
let params: Params = &[&self.name, &self.store.type_name, &(version as i64)];
#[cfg(feature = "with-tracing")]
tracing::trace!(
checkpoint = version,
subscription.name = %self.name,
subscription.aggregate_type = %self.store.type_name,
"Checkpointing persistent subscription"
);
self.client
.execute(CHECKPOINT_SUBSCRIPTION, params)
.await
.map_err(Error::Checkpoint)?;
self.last_sequence_number
.store(version as i64, Ordering::Relaxed);
Ok(())
})
}
}