timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
use crate::consumer::{
    AggregateConsumerStore, RootConsumerStore, TransientConsumer, TransientConsumerBuilder,
};
use crate::error::{Error, Result};
use crate::TimesourceEventPayload;
use futures::stream::BoxStream;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::borrow::Cow;
use std::fmt::Debug;
use std::time::Duration;
use timesource_core::event::Persisted;
use uuid::Uuid;

use super::super::store::checkpoint::OffsetCommitter;
use super::super::store::{ConsumerAck, ConsumerStore};

/// Builder for a persistent Consumer
pub struct ConsumerBuilder<'a> {
    event_buffer_capacity: usize,
    notification_buffer_capacity: usize,
    polling_freq: Duration,
    dsn: &'a str,
}

impl<'a> ConsumerBuilder<'a> {
    pub fn new(dsn: &'a str) -> Self {
        Self {
            event_buffer_capacity: 100,
            notification_buffer_capacity: 100,
            polling_freq: Duration::from_secs(60),
            dsn,
        }
    }

    /// Sets the frequency of long polling for new events.
    ///
    /// When new events are being inserted in the store at a rate lower than this value,
    /// polling will never occur
    ///
    /// Long polling is only a backup mechanism when postgres LISTEN notifications are lost
    /// (e.g. temporary network failure). Therefore, it shouldn't be set too low, or else
    /// polling will consume resources from the database server unnecessarily
    ///
    /// Default value is 1 minute
    ///
    /// [`ConsumerBuilder`]: struct.ConsumerBuilder.html
    pub fn with_backup_polling_frequency(mut self, duration: Duration) -> Self {
        self.polling_freq = duration;
        self
    }

    /// Sets the capacity of the event buffer.
    ///
    /// Consumer will store a buffer of events for better performance.
    /// After bootstrap and when the buffer reaches its capacity, the Consumer will pause
    /// until consumer has caught up.
    ///
    /// Defaults to 100.
    ///
    /// [`ConsumerBuilder`]: struct.ConsumerBuilder.html
    pub fn with_event_buffer_capacity(mut self, capacity: usize) -> Self {
        self.event_buffer_capacity = capacity;
        self
    }

    pub fn with_notification_buffer_capacity(mut self, capacity: usize) -> Self {
        self.notification_buffer_capacity = capacity;
        self
    }

    /// Creates a new [`Consumer`] with the specified name for the `aggregate_type_name`
    /// if it doesn't exists already.
    ///
    pub async fn aggregate_build<Event>(
        &self,
        consumer_name: Cow<'static, str>,
        aggregate_type_name: &'a str,
    ) -> Result<Consumer<AggregateConsumerStore<Event>>>
    where
        Event: TimesourceEventPayload + 'static + Send + Sync + Debug,
    {
        let pool = self.pool().await?;
        let inner_builder = self.inner_builder();
        let inner = inner_builder.aggregate_build(aggregate_type_name).await?;

        let aggregate_type_id = self.aggregate_type_id(aggregate_type_name, &pool).await?;

        //create consumer if it doesn't exist
        let offset = sqlx::query_file_scalar!(
            "queries/consumer/checkpoint/offset.sql",
            consumer_name.as_ref(),
            aggregate_type_id,
            None::<Uuid>
        )
        .fetch_one(&pool)
        .await?;

        debug!(
            consumer_name = consumer_name.as_ref(),
            aggregate_type_id,
            ?offset,
            "Consumer state"
        );

        let committer = OffsetCommitter::new(consumer_name.into(), pool.clone());

        Ok(Consumer { committer, inner })
    }

    pub async fn aggregate_root_build<Event>(
        &self,
        consumer_name: Cow<'static, str>,
        aggregate_type_name: &'a str,
        root_id: Uuid,
    ) -> Result<Consumer<RootConsumerStore<Event>>>
    where
        Event: TimesourceEventPayload + 'static + Send + Sync + Debug,
    {
        let pool = self.pool().await?;
        let aggregate_type_id = self.aggregate_type_id(aggregate_type_name, &pool).await?;
        let inner_builder = self.inner_builder();
        let inner = inner_builder
            .aggregate_root_build(aggregate_type_name, root_id)
            .await?;

        //create consumer if it doesn't exist
        let offset = sqlx::query_file_scalar!(
            "queries/consumer/checkpoint/offset.sql",
            consumer_name.as_ref(),
            aggregate_type_id,
            root_id
        )
        .fetch_one(&pool)
        .await?;

        debug!(
            consumer_name = consumer_name.as_ref(),
            aggregate_type_id,
            ?offset,
            "Consumer state"
        );

        let committer = OffsetCommitter::new(consumer_name.into(), pool.clone());

        Ok(Consumer { committer, inner })
    }

    async fn pool(&self) -> Result<PgPool> {
        Ok(PgPoolOptions::new().connect(self.dsn).await?)
    }

    async fn aggregate_type_id(&self, aggregate_type_name: &str, pool: &PgPool) -> Result<i32> {
        Ok(
            sqlx::query_file_scalar!("queries/aggregate_type/id.sql", aggregate_type_name)
                .fetch_one(pool)
                .await?
                .ok_or_else(|| Error::InvalidData("Unable to get aggregate type id".into()))?,
        )
    }

    fn inner_builder(&self) -> TransientConsumerBuilder {
        TransientConsumerBuilder::new(self.dsn)
            .with_backup_polling_frequency(self.polling_freq)
            .with_event_buffer_capacity(self.event_buffer_capacity)
            .with_notification_buffer_capacity(self.notification_buffer_capacity)
    }
}

/// A consumer will start listening for events since last offset if known.
/// That is, when a brand new consumer is created, it will consume all events produced
/// for a given aggregate or aggregate root.
///
/// For each event, the subscriber should commit the event by using ack methods.
///
/// Then, when consumer is started again it will resume from last committed offset.
pub struct Consumer<Store>
where
    Store: ConsumerStore,
{
    committer: OffsetCommitter,
    inner: TransientConsumer<Store>,
}

impl<Store> Consumer<Store>
where
    Store: ConsumerStore,
{
    pub async fn resume(
        &self,
    ) -> Result<BoxStream<'_, Result<Persisted<<Store as ConsumerStore>::Event>>>> {
        self.inner.resume().await
    }

    pub async fn ack(&self, offset: u64) -> Result<()> {
        self.committer
            .save_offset(offset)
            .await
            .map_err(Error::from)
    }

    pub async fn try_ack(&self, offset: u64) -> Result<()> {
        self.committer
            .try_save_offset(offset)
            .await
            .map_err(Error::from)
    }
}