timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
use super::event::*;
use crate::aggregate::UncommittedEvent;
use crate::error::{Error, Result as TsResult};
use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::fmt::Debug;
use uuid::Uuid;

#[derive(Debug)]
pub enum CommitOrder {
    None,
    First,
    Following(DateTime<Utc>),
}

/// An Event Store is an append-only, ordered list of
/// [`Event`](super::aggregate::Aggregate::Event)s for a certain "source" --
/// e.g. an [`Aggregate`](super::aggregate::Aggregate).
#[async_trait]
pub trait EventStore {
    type Event: Send + Sync + TimesourceEventPayload + std::fmt::Debug;

    async fn commit(
        &self,
        aggregate_id: Uuid,
        order: CommitOrder,
        events: &[UncommittedEvent<Self::Event>],
    ) -> TsResult<Vec<u64>>;

    async fn aggregate_stream(
        &self,
        aggregate_id: Uuid,
    ) -> BoxStream<'_, TsResult<Persisted<Self::Event>>>;

    async fn remove(&mut self, aggregate_id: Uuid) -> TsResult<()>;
}

#[derive(Debug)]
pub struct EventStoreBuilder<'a> {
    with_migrations: bool,
    dsn: &'a str,
}

impl<'a> EventStoreBuilder<'a> {
    pub fn new(dsn: &'a str) -> Self {
        Self {
            with_migrations: true,
            dsn,
        }
    }

    pub fn with_migrations(mut self, value: bool) -> Self {
        self.with_migrations = value;
        self
    }

    pub async fn build<Event>(&self, aggregate_type_name: &str) -> TsResult<TimescaleStore<Event>>
    where
        Event: Send + Sync + Debug + TimesourceEventPayload,
    {
        let pool = PgPoolOptions::new()
            .connect(self.dsn)
            .await
            .expect("being able to create Pg pool");

        if self.with_migrations {
            sqlx::migrate!().run(&pool).await?;
        }

        let aggregate_type_id =
            sqlx::query_file_scalar!("queries/aggregate_type/id.sql", aggregate_type_name)
                .fetch_one(&pool)
                .await?
                .ok_or_else(|| Error::InvalidData("Unable to get aggregate type id".into()))?;

        sqlx::query_file!(
            "queries/event/get_all_name_events.sql",
            aggregate_type_id,
            100
        )
        .fetch_all(&pool)
        .await?;

        trace!(
            aggregate_type_name,
            aggregate_type_id,
            "Aggregate root data received"
        );

        let event_meta_id = sqlx::query_file_scalar!(
            "queries/event/upsert_event_meta.sql",
            aggregate_type_id,
            Event::name(),
            Event::version(),
            &Event::encoding() as _
        )
        .fetch_one(&pool)
        .await?;

        Ok(TimescaleStore {
            payload: std::marker::PhantomData,
            pool,
            aggregate_type_id,
            event_meta_id: event_meta_id.unwrap(),
        })
    }
}

#[derive(sqlx::Type)]
#[sqlx(type_name = "append_event_data")]
struct AppendEventInput {
    bytes: Option<Vec<u8>>,
    json: Option<String>,
    meta_id: i32,
    time: i64,
}

#[derive(sqlx::Type)]
#[sqlx(type_name = "_append_event_data")]
struct VecAppendEventInput(Vec<AppendEventInput>);

#[derive(Clone, Debug)]
pub struct TimescaleStore<Event> {
    payload: std::marker::PhantomData<Event>,
    pool: PgPool,
    pub event_meta_id: i32,
    pub aggregate_type_id: i32,
}

#[async_trait]
impl<Event> EventStore for TimescaleStore<Event>
where
    Event: Send + Sync + TimesourceEventPayload + Debug,
{
    type Event = Event;

    #[tracing::instrument]
    async fn commit(
        &self,
        aggregate_id: Uuid,
        order: CommitOrder,
        events: &[UncommittedEvent<Self::Event>],
    ) -> TsResult<Vec<u64>> {
        if events.is_empty() {
            return Err(Error::InvalidData(
                "list of events can't be empty".to_string(),
            ));
        }

        let mut event_data: Vec<AppendEventInput> = Vec::with_capacity(events.len());

        for event in events {
            let time = event.utc.timestamp_nanos();
            let payload = &event.data.encode_to_payload()?;

            event_data.push(AppendEventInput {
                bytes: if let EventPayloadEncoding::Bytes(bytes) = payload {
                    Some(bytes.to_owned())
                } else {
                    None
                },
                json: if let EventPayloadEncoding::Json(str) = payload {
                    Some(str.to_owned())
                } else {
                    None
                },
                meta_id: self.event_meta_id,
                time,
            });
        }

        let (orderly, last_known_time) = match order {
            CommitOrder::None => (false, None),
            CommitOrder::First => (true, None),
            CommitOrder::Following(utc) => (true, Some(utc.timestamp_nanos())),
        };

        let inserted_ids = sqlx::query_file_scalar!(
            "queries/event/append.sql",
            self.aggregate_type_id,
            &aggregate_id,
            &VecAppendEventInput(event_data) as _,
            orderly,
            last_known_time
        )
        .fetch_one(&self.pool)
        .await?;

        debug!(?inserted_ids, "Committed events");

        Ok(inserted_ids
            .unwrap_or_default()
            .into_iter()
            .map(|x| x as _)
            .collect::<Vec<_>>())
    }

    #[tracing::instrument]
    async fn aggregate_stream(
        &self,
        aggregate_id: Uuid,
    ) -> BoxStream<'_, TsResult<Persisted<Self::Event>>> {
        sqlx::query_file_as!(
            EventRow,
            "queries/event/get_aggregate_all.sql",
            self.aggregate_type_id,
            aggregate_id,
        )
        .fetch(&self.pool)
        .map_err(Error::from)
        .map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
        .boxed()
    }

    #[tracing::instrument]
    async fn remove(&mut self, aggregate_id: Uuid) -> TsResult<()> {
        sqlx::query_file!(
            "queries/event/delete_aggregate.sql",
            self.aggregate_type_id,
            &aggregate_id
        )
        .execute(&self.pool)
        .await
        .map_err(Error::from)
        .map(|_| ())
    }
}