eventastic_postgres 0.5.0

A postgres event store for eventastic
Documentation
use crate::{
    EventSourcingDbError, PostgresTransaction, SideEffectStorage, encryption::EncryptionProvider,
    pickle::Pickle, reader_impl, table_config::TableConfig,
};
use async_trait::async_trait;
use eventastic::{
    aggregate::{Aggregate, Context, SideEffect},
    event::{DomainEvent, EventStoreEvent},
    repository::{Repository, RepositoryError, RepositoryReader, Snapshot},
};
use sqlx::{
    Pool, Postgres,
    postgres::{PgConnectOptions, PgPoolOptions},
    types::Uuid,
};
use std::marker::PhantomData;

/// PostgreSQL-based repository implementation for event sourcing.
///
/// This repository provides persistent storage for aggregates, events, and snapshots
/// using PostgreSQL as the backing store. It integrates with a configurable side effect
/// storage mechanism for handling the outbox pattern.
#[derive(Clone)]
pub struct PostgresRepository<T, O, E> {
    inner: Pool<Postgres>,
    outbox: O,
    table_config: TableConfig,
    encryption_provider: E,
    phantom_aggregate: std::marker::PhantomData<T>,
}

impl<T, O, E> PostgresRepository<T, O, E> {
    /// Creates a new PostgreSQL repository with the specified connection and pool options.
    ///
    /// # Parameters
    ///
    /// - `connect_options` - PostgreSQL connection configuration
    /// - `pool_options` - Connection pool configuration  
    /// - `outbox` - Side effect storage implementation for the outbox pattern
    pub async fn new(
        connect_options: PgConnectOptions,
        pool_options: PgPoolOptions,
        table_config: TableConfig,
        outbox: O,
        encryption_provider: E,
    ) -> Result<Self, sqlx::Error> {
        let pool = pool_options.connect_with(connect_options).await?;

        Ok(Self {
            inner: pool,
            outbox,
            table_config,
            encryption_provider,
            phantom_aggregate: PhantomData,
        })
    }

    /// Start a new database transaction using the default isolation level.
    ///
    /// The returned transaction can be used to perform multiple operations
    /// atomically and provides access to the repository methods.
    pub async fn begin_transaction(&self) -> Result<PostgresTransaction<'_, T, O, E>, sqlx::Error> {
        Ok(PostgresTransaction {
            inner: self.inner.begin().await?,
            outbox: &self.outbox,
            table_config: &self.table_config,
            encryption_provider: &self.encryption_provider,
            phantom_aggregate: PhantomData,
        })
    }

    /// Create a transaction from an existing raw sqlx transaction.
    ///
    /// This is useful for multi-aggregate scenarios where you want to use
    /// the same database transaction across multiple repository types.
    pub fn transaction_from<'a>(
        &'a self,
        transaction: sqlx::Transaction<'a, Postgres>,
    ) -> PostgresTransaction<'a, T, O, E> {
        PostgresTransaction {
            inner: transaction,
            outbox: &self.outbox,
            table_config: &self.table_config,
            encryption_provider: &self.encryption_provider,
            phantom_aggregate: PhantomData,
        }
    }

    /// Run database migrations to set up the required tables and schema.
    ///
    /// This method should be called once during application startup to ensure
    /// the database schema is up to date with the required tables for events,
    /// snapshots, and outbox storage.
    pub async fn run_migrations(&self) -> Result<(), sqlx::Error> {
        sqlx::migrate!("./migrations").run(&self.inner).await?;

        Ok(())
    }
}

#[async_trait]
impl<T, O, E> RepositoryReader<T> for PostgresRepository<T, O, E>
where
    T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
    T::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
    T::SideEffect: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
    T::ApplyError: Send + Sync,
    O: SideEffectStorage<E::Error, T::SideEffect> + Clone + Send + Sync,
    E: EncryptionProvider + Clone + Send + Sync,
{
    type DbError = EventSourcingDbError<E, T>;

    /// Returns a stream of domain events.
    fn stream_from(
        &mut self,
        id: &T::AggregateId,
        version: u64,
    ) -> impl futures::Stream<
        Item = std::result::Result<
            eventastic::event::EventStoreEvent<
                <T as eventastic::aggregate::Aggregate>::DomainEvent,
            >,
            Self::DbError,
        >,
    > {
        let query = &self.table_config.stream_events_query;
        Box::pin(reader_impl::stream_from::<_, T, E>(
            &self.inner,
            id,
            version,
            query.clone(),
            &self.encryption_provider,
        ))
    }

    /// Returns a specific domain event from the database.
    async fn get_event(
        &mut self,
        aggregate_id: &T::AggregateId,
        event_id: &<<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
    ) -> Result<Option<EventStoreEvent<<T as Aggregate>::DomainEvent>>, Self::DbError> {
        let query = &self.table_config.get_event_query;
        reader_impl::get_event::<_, T, E>(
            &self.inner,
            aggregate_id,
            event_id,
            query,
            &self.encryption_provider,
        )
        .await
    }

    /// Returns a snapshot of the aggregate in the database
    async fn get_snapshot(
        &mut self,
        id: &T::AggregateId,
    ) -> Result<Option<Snapshot<T>>, Self::DbError> {
        let query = &self.table_config.get_snapshot_query;
        reader_impl::get_snapshot::<_, T, E>(&self.inner, id, query, &self.encryption_provider)
            .await
    }
}

#[async_trait]
impl<T, O, E> Repository<T> for PostgresRepository<T, O, E>
where
    T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
    T::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
    T::SideEffect: eventastic::aggregate::SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
    T::ApplyError: Send + Sync,
    O: SideEffectStorage<E::Error, T::SideEffect> + Clone + Send + Sync,
    E: EncryptionProvider + Clone + Send + Sync,
{
    type Error = RepositoryError<
        T::ApplyError,
        <<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
        EventSourcingDbError<E, T>,
    >;

    /// Loads an aggregate from the repository by its ID.
    ///
    /// This method performs a non-transactional read directly from the pool,
    /// avoiding the overhead of starting a transaction. It will load the
    /// latest state of the aggregate by replaying its event stream.
    /// If a snapshot is available, it will be used to optimize the loading process.
    async fn load(&self, aggregate_id: &T::AggregateId) -> Result<Context<T>, Self::Error> {
        // Create a mutable reference to self to satisfy the RepositoryReader trait
        let mut repo_ref = self.clone();
        Context::load(&mut repo_ref, aggregate_id).await
    }
}