use crate::consumer::{
AggregateConsumerStore, RootConsumerStore, TransientConsumer, TransientConsumerBuilder,
};
use crate::error::{Error, Result};
use crate::TimesourceEventPayload;
use futures::stream::BoxStream;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::borrow::Cow;
use std::fmt::Debug;
use std::time::Duration;
use timesource_core::event::Persisted;
use uuid::Uuid;
use super::super::store::checkpoint::OffsetCommitter;
use super::super::store::{ConsumerAck, ConsumerStore};
pub struct ConsumerBuilder<'a> {
event_buffer_capacity: usize,
notification_buffer_capacity: usize,
polling_freq: Duration,
dsn: &'a str,
}
impl<'a> ConsumerBuilder<'a> {
pub fn new(dsn: &'a str) -> Self {
Self {
event_buffer_capacity: 100,
notification_buffer_capacity: 100,
polling_freq: Duration::from_secs(60),
dsn,
}
}
pub fn with_backup_polling_frequency(mut self, duration: Duration) -> Self {
self.polling_freq = duration;
self
}
pub fn with_event_buffer_capacity(mut self, capacity: usize) -> Self {
self.event_buffer_capacity = capacity;
self
}
pub fn with_notification_buffer_capacity(mut self, capacity: usize) -> Self {
self.notification_buffer_capacity = capacity;
self
}
pub async fn aggregate_build<Event>(
&self,
consumer_name: Cow<'static, str>,
aggregate_type_name: &'a str,
) -> Result<Consumer<AggregateConsumerStore<Event>>>
where
Event: TimesourceEventPayload + 'static + Send + Sync + Debug,
{
let pool = self.pool().await?;
let inner_builder = self.inner_builder();
let inner = inner_builder.aggregate_build(aggregate_type_name).await?;
let aggregate_type_id = self.aggregate_type_id(aggregate_type_name, &pool).await?;
let offset = sqlx::query_file_scalar!(
"queries/consumer/checkpoint/offset.sql",
consumer_name.as_ref(),
aggregate_type_id,
None::<Uuid>
)
.fetch_one(&pool)
.await?;
debug!(
consumer_name = consumer_name.as_ref(),
aggregate_type_id,
?offset,
"Consumer state"
);
let committer = OffsetCommitter::new(consumer_name.into(), pool.clone());
Ok(Consumer { committer, inner })
}
pub async fn aggregate_root_build<Event>(
&self,
consumer_name: Cow<'static, str>,
aggregate_type_name: &'a str,
root_id: Uuid,
) -> Result<Consumer<RootConsumerStore<Event>>>
where
Event: TimesourceEventPayload + 'static + Send + Sync + Debug,
{
let pool = self.pool().await?;
let aggregate_type_id = self.aggregate_type_id(aggregate_type_name, &pool).await?;
let inner_builder = self.inner_builder();
let inner = inner_builder
.aggregate_root_build(aggregate_type_name, root_id)
.await?;
let offset = sqlx::query_file_scalar!(
"queries/consumer/checkpoint/offset.sql",
consumer_name.as_ref(),
aggregate_type_id,
root_id
)
.fetch_one(&pool)
.await?;
debug!(
consumer_name = consumer_name.as_ref(),
aggregate_type_id,
?offset,
"Consumer state"
);
let committer = OffsetCommitter::new(consumer_name.into(), pool.clone());
Ok(Consumer { committer, inner })
}
async fn pool(&self) -> Result<PgPool> {
Ok(PgPoolOptions::new().connect(self.dsn).await?)
}
async fn aggregate_type_id(&self, aggregate_type_name: &str, pool: &PgPool) -> Result<i32> {
Ok(
sqlx::query_file_scalar!("queries/aggregate_type/id.sql", aggregate_type_name)
.fetch_one(pool)
.await?
.ok_or_else(|| Error::InvalidData("Unable to get aggregate type id".into()))?,
)
}
fn inner_builder(&self) -> TransientConsumerBuilder {
TransientConsumerBuilder::new(self.dsn)
.with_backup_polling_frequency(self.polling_freq)
.with_event_buffer_capacity(self.event_buffer_capacity)
.with_notification_buffer_capacity(self.notification_buffer_capacity)
}
}
pub struct Consumer<Store>
where
Store: ConsumerStore,
{
committer: OffsetCommitter,
inner: TransientConsumer<Store>,
}
impl<Store> Consumer<Store>
where
Store: ConsumerStore,
{
pub async fn resume(
&self,
) -> Result<BoxStream<'_, Result<Persisted<<Store as ConsumerStore>::Event>>>> {
self.inner.resume().await
}
pub async fn ack(&self, offset: u64) -> Result<()> {
self.committer
.save_offset(offset)
.await
.map_err(Error::from)
}
pub async fn try_ack(&self, offset: u64) -> Result<()> {
self.committer
.try_save_offset(offset)
.await
.map_err(Error::from)
}
}