use crate::error::Error;
use crate::event::EventRow;
use crate::TimesourceEventPayload;
use futures::stream::{self, BoxStream};
use futures::{StreamExt, TryStreamExt};
use sqlx::PgPool;
use std::borrow::Cow;
use std::fmt::Debug;
use std::marker::PhantomData;
use timesource_core::event::Persisted;
use super::ConsumerStore;
use super::StoreData;
#[derive(Debug)]
pub struct AggregateConsumerStore<Event>
where
Event: TimesourceEventPayload + Send + Sync + Debug,
{
aggregate_type_id: i32,
consumer_name: Option<Cow<'static, str>>,
pool: PgPool,
_event: PhantomData<Event>,
}
impl<Event> AggregateConsumerStore<Event>
where
Event: TimesourceEventPayload + Send + Sync + Debug,
{
pub fn new(aggregate_type_id: i32, consumer_name: Cow<'static, str>, pool: PgPool) -> Self {
Self {
aggregate_type_id,
consumer_name: Some(consumer_name),
pool,
_event: PhantomData::<Event>,
}
}
pub fn new_anonymous(aggregate_type_id: i32, pool: PgPool) -> Self {
Self {
aggregate_type_id,
consumer_name: None,
pool,
_event: PhantomData::<Event>,
}
}
}
impl<Event> Clone for AggregateConsumerStore<Event>
where
Event: TimesourceEventPayload + Send + Sync + Debug,
{
fn clone(&self) -> Self {
Self {
aggregate_type_id: self.aggregate_type_id,
consumer_name: self.consumer_name.clone(),
pool: self.pool.clone(),
_event: PhantomData::<Event>,
}
}
}
#[async_trait]
impl<Event> ConsumerStore for AggregateConsumerStore<Event>
where
Event: TimesourceEventPayload + Send + Sync + 'static + Debug,
{
type Event = Event;
#[tracing::instrument]
fn events_after_offset(&self) -> BoxStream<'_, StoreData<Event>> {
match &self.consumer_name {
None => stream::iter(vec![]).boxed(),
Some(name) => {
let aggregate_type_id = &self.aggregate_type_id;
sqlx::query_file_as!(
EventRow,
"queries/consumer/aggregate/events_after_offset.sql",
aggregate_type_id,
name.as_ref()
)
.fetch(&self.pool)
.map_err(Error::from)
.map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
.boxed()
}
}
}
#[tracing::instrument]
fn events_after(&self, offset: u64) -> BoxStream<'_, StoreData<Event>> {
let aggregate_type_id = &self.aggregate_type_id;
sqlx::query_file_as!(
EventRow,
"queries/consumer/aggregate/events_after.sql",
aggregate_type_id,
offset as i64,
)
.fetch(&self.pool)
.map_err(Error::from)
.map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
.boxed()
}
#[tracing::instrument]
fn events_range(
&self,
later_than: u64, until: u64, ) -> BoxStream<'_, StoreData<Event>> {
let aggregate_type_id = &self.aggregate_type_id;
sqlx::query_file_as!(
EventRow,
"queries/consumer/aggregate/events_range.sql",
aggregate_type_id,
later_than as i64,
until as i64,
)
.fetch(&self.pool)
.map_err(Error::from)
.map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
.boxed()
}
}