evento-store 0.10.2

A collection of libraries and tools that help you build DDD, CQRS, and event sourcing.
Documentation
use std::collections::HashSet;

use async_trait::async_trait;
use evento_query::{Cursor, CursorType, PgQuery, QueryResult};
use serde_json::Value;
use sqlx::{PgPool, Postgres, QueryBuilder};
use uuid::Uuid;

use crate::{
    engine::Engine,
    error::{Result, StoreError},
    store::{Event, Store, WriteEvent},
};

#[derive(Debug, Clone)]
pub struct PgStore {
    pool: PgPool,
    prefix: Option<String>,
}

impl PgStore {
    pub fn create(pool: &PgPool) -> Store {
        Store::new(Self {
            pool: pool.clone(),
            prefix: None,
        })
    }

    pub fn with_prefix(pool: &PgPool, prefix: impl Into<String>) -> Store {
        Store::new(Self {
            pool: pool.clone(),
            prefix: Some(prefix.into()),
        })
    }

    pub fn table(&self, name: impl Into<String>) -> String {
        format!(
            "{}_{}",
            self.prefix.as_ref().unwrap_or(&"ev".to_owned()),
            name.into()
        )
    }

    pub fn table_events(&self) -> String {
        self.table("event")
    }
}

#[async_trait]
impl Engine for PgStore {
    async fn write(
        &self,
        aggregate_id: &'_ str,
        write_events: Vec<WriteEvent>,
        original_version: u16,
    ) -> Result<Vec<Event>> {
        let table_events = self.table_events();
        let mut tx = self.pool.begin().await?;

        let mut version = original_version;
        let mut events = Vec::new();

        for write_events in write_events.chunks(100).collect::<Vec<&[WriteEvent]>>() {
            let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
                    format!("INSERT INTO {table_events} (id, name, aggregate_id, version, data, metadata, created_at) ")
                );

            query_builder.push_values(write_events, |mut b, event| {
                version += 1;

                let event = event.to_event(aggregate_id, version);

                b.push_bind(event.id.to_owned())
                    .push_bind(event.name.to_owned())
                    .push_bind(event.aggregate_id.to_owned())
                    .push_bind(event.version)
                    .push_bind(event.data.clone())
                    .push_bind(event.metadata.clone())
                    .push_bind(event.created_at);

                events.push(event);
            });

            query_builder
                .push("ON CONFLICT (aggregate_id, version) DO NOTHING")
                .build()
                .execute(&mut *tx)
                .await?;
        }

        let next_event_id = sqlx::query_as::<_, Event>(
            format!(
                r#"
                SELECT * FROM {table_events}
                WHERE aggregate_id = $1 AND version = $2
                ORDER BY created_at ASC
                LIMIT 1
                "#
            )
            .as_str(),
        )
        .bind(aggregate_id)
        .bind(i32::from(original_version + 1))
        .fetch_optional(&mut *tx)
        .await?;

        let wrong_version = match (next_event_id, events.first()) {
            (Some(next), Some(current)) => next.id != current.id,
            _ => false,
        };

        if wrong_version {
            tx.rollback().await?;

            return Err(StoreError::UnexpectedOriginalVersion);
        }

        tx.commit().await?;

        Ok(events)
    }

    async fn insert(&self, events: Vec<Event>) -> Result<()> {
        let table_events = self.table_events();

        for events in events.chunks(100).collect::<Vec<&[Event]>>() {
            let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
                    format!("INSERT INTO {table_events} (id, name, aggregate_id, version, data, metadata, created_at) ")
                );

            query_builder.push_values(events, |mut b, event| {
                b.push_bind(event.id.to_owned())
                    .push_bind(event.name.to_owned())
                    .push_bind(event.aggregate_id.to_owned())
                    .push_bind(event.version)
                    .push_bind(event.data.clone())
                    .push_bind(event.metadata.clone())
                    .push_bind(event.created_at);
            });

            query_builder.build().execute(&self.pool).await?;
        }

        Ok(())
    }

    async fn upsert(&self, event: Event) -> Result<()> {
        let table_events = self.table_events();

        sqlx::query_as::<_, (Uuid,)>(
            format!(
                r#"
            INSERT INTO {table_events} (id, name, aggregate_id, version, data, metadata, created_at)
            VALUES ($1, $2, $3, $4, $5, $6, $7)
            ON CONFLICT (aggregate_id, version)
            DO
                UPDATE SET data = $5, metadata = $6
            RETURNING id
            "#
            )
            .as_str(),
        )
        .bind(event.id)
        .bind(event.name)
        .bind(&event.aggregate_id)
        .bind(event.version)
        .bind(event.data)
        .bind(event.metadata)
        .bind(event.created_at)
        .fetch_one(&self.pool)
        .await?;

        Ok(())
    }

    async fn read(
        &self,
        first: u16,
        after: Option<CursorType>,
        filters: Option<Vec<Value>>,
        aggregate_id: Option<&'_ str>,
    ) -> Result<QueryResult<Event>> {
        let mut json_filters = HashSet::new();
        let table_events = self.table_events();

        if let Some(filters) = filters {
            for filter in filters {
                let json_filter = serde_json::to_string(&filter)?;
                json_filters.insert(format!("metadata @> '{json_filter}'::jsonb"));
            }
        }

        let filters = if json_filters.is_empty() {
            None
        } else {
            Some(
                json_filters
                    .into_iter()
                    .collect::<Vec<String>>()
                    .join(" OR "),
            )
        };

        let query = match (aggregate_id, filters) {
            (Some(aggregate_id), Some(filters)) => PgQuery::<Event>::new(format!(
                "SELECT * FROM {table_events} WHERE aggregate_id = $1 AND ({filters})"
            ))
            .bind(aggregate_id),
            (None, Some(filters)) => {
                PgQuery::<Event>::new(format!("SELECT * FROM {table_events} WHERE ({filters})"))
            }
            (Some(aggregate_id), None) => PgQuery::<Event>::new(format!(
                "SELECT * FROM {table_events} WHERE aggregate_id = $1"
            ))
            .bind(aggregate_id),
            (None, None) => PgQuery::<Event>::new(format!("SELECT * FROM {table_events}")),
        };

        let events = query.forward(first, after).fetch_all(&self.pool).await?;

        Ok(events)
    }

    async fn last(&self) -> Result<Option<Event>> {
        let table_events = self.table_events();
        let event = sqlx::query_as::<_, Event>(
            format!(
                r#"
                SELECT * from {table_events}
                ORDER BY created_at DESC
                LIMIT 1
            "#
            )
            .as_str(),
        )
        .fetch_optional(&self.pool)
        .await?;
        Ok(event)
    }
}

impl Cursor for Event {
    fn keys() -> Vec<&'static str> {
        vec!["created_at", "version", "id"]
    }

    fn bind<'q, O>(
        self,
        query: sqlx::query::QueryAs<Postgres, O, sqlx::postgres::PgArguments>,
    ) -> sqlx::query::QueryAs<Postgres, O, sqlx::postgres::PgArguments>
    where
        O: for<'r> sqlx::FromRow<'r, <sqlx::Postgres as sqlx::Database>::Row>,
        O: 'q + std::marker::Send,
        O: 'q + Unpin,
        O: 'q + Cursor,
    {
        query.bind(self.created_at).bind(self.version).bind(self.id)
    }

    fn serialize(&self) -> Vec<String> {
        vec![
            Self::serialize_utc(self.created_at),
            self.version.to_string(),
            self.id.to_string(),
        ]
    }

    fn deserialize(values: Vec<&str>) -> std::result::Result<Self, evento_query::QueryError> {
        let mut values = values.iter();
        let created_at = Self::deserialize_as_utc("created_at", values.next())?;
        let version = Self::deserialize_as("version", values.next())?;
        let id = Self::deserialize_as("id", values.next())?;

        Ok(Event {
            id,
            version,
            created_at,
            ..Default::default()
        })
    }
}