timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
use crate::error::Error;
use crate::event::EventRow;
use crate::TimesourceEventPayload;
use futures::stream::{self, BoxStream};
use futures::{StreamExt, TryStreamExt};
use sqlx::PgPool;
use std::borrow::Cow;
use std::fmt::Debug;
use std::marker::PhantomData;
use timesource_core::event::Persisted;

use super::ConsumerStore;
use super::StoreData;

#[derive(Debug)]
pub struct AggregateConsumerStore<Event>
where
    Event: TimesourceEventPayload + Send + Sync + Debug,
{
    aggregate_type_id: i32,
    consumer_name: Option<Cow<'static, str>>,
    pool: PgPool,
    _event: PhantomData<Event>,
}

impl<Event> AggregateConsumerStore<Event>
where
    Event: TimesourceEventPayload + Send + Sync + Debug,
{
    pub fn new(aggregate_type_id: i32, consumer_name: Cow<'static, str>, pool: PgPool) -> Self {
        Self {
            aggregate_type_id,
            consumer_name: Some(consumer_name),
            pool,
            _event: PhantomData::<Event>,
        }
    }

    pub fn new_anonymous(aggregate_type_id: i32, pool: PgPool) -> Self {
        Self {
            aggregate_type_id,
            consumer_name: None,
            pool,
            _event: PhantomData::<Event>,
        }
    }
}

impl<Event> Clone for AggregateConsumerStore<Event>
where
    Event: TimesourceEventPayload + Send + Sync + Debug,
{
    fn clone(&self) -> Self {
        Self {
            aggregate_type_id: self.aggregate_type_id,
            consumer_name: self.consumer_name.clone(),
            pool: self.pool.clone(),
            _event: PhantomData::<Event>,
        }
    }
}

#[async_trait]
impl<Event> ConsumerStore for AggregateConsumerStore<Event>
where
    Event: TimesourceEventPayload + Send + Sync + 'static + Debug,
{
    type Event = Event;

    #[tracing::instrument]
    fn events_after_offset(&self) -> BoxStream<'_, StoreData<Event>> {
        match &self.consumer_name {
            None => stream::iter(vec![]).boxed(),
            Some(name) => {
                let aggregate_type_id = &self.aggregate_type_id;

                sqlx::query_file_as!(
                    EventRow,
                    "queries/consumer/aggregate/events_after_offset.sql",
                    aggregate_type_id,
                    name.as_ref()
                )
                .fetch(&self.pool)
                .map_err(Error::from)
                .map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
                .boxed()
            }
        }
    }

    #[tracing::instrument]
    fn events_after(&self, offset: u64) -> BoxStream<'_, StoreData<Event>> {
        let aggregate_type_id = &self.aggregate_type_id;

        sqlx::query_file_as!(
            EventRow,
            "queries/consumer/aggregate/events_after.sql",
            aggregate_type_id,
            offset as i64,
        )
        .fetch(&self.pool)
        .map_err(Error::from)
        .map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
        .boxed()
    }

    #[tracing::instrument]
    fn events_range(
        &self,
        later_than: u64, // >
        until: u64,      // <=
    ) -> BoxStream<'_, StoreData<Event>> {
        let aggregate_type_id = &self.aggregate_type_id;

        sqlx::query_file_as!(
            EventRow,
            "queries/consumer/aggregate/events_range.sql",
            aggregate_type_id,
            later_than as i64,
            until as i64,
        )
        .fetch(&self.pool)
        .map_err(Error::from)
        .map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
        .boxed()
    }
}