es-entity 0.10.19

Event Sourcing Entity Framework
Documentation
//! Handle execution of database operations and transactions.

pub mod hooks;
mod with_time;

use sqlx::{Acquire, PgPool, Postgres, Transaction};

use crate::clock::ClockHandle;

pub use with_time::*;

/// Default return type of the derived EsRepo::begin_op().
///
/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
/// transaction is taking place.
///
/// When an artificial clock is provided, the transaction will automatically cache that
/// clock's time, enabling deterministic testing. This cached time will be used in all
/// time-dependent operations.
pub struct DbOp<'c> {
    tx: Transaction<'c, Postgres>,
    clock: ClockHandle,
    now: Option<chrono::DateTime<chrono::Utc>>,
    commit_hooks: Option<hooks::CommitHooks>,
}

impl<'c> DbOp<'c> {
    fn new(
        tx: Transaction<'c, Postgres>,
        clock: ClockHandle,
        time: Option<chrono::DateTime<chrono::Utc>>,
    ) -> Self {
        Self {
            tx,
            clock,
            now: time,
            commit_hooks: Some(hooks::CommitHooks::new()),
        }
    }

    /// Initializes a transaction using the global clock.
    ///
    /// Delegates to [`init_with_clock`](Self::init_with_clock) using the global clock handle.
    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
        Self::init_with_clock(pool, crate::clock::Clock::handle()).await
    }

    /// Initializes a transaction with the specified clock.
    ///
    /// If the clock is artificial, its current time will be cached in the transaction.
    pub async fn init_with_clock(
        pool: &PgPool,
        clock: &ClockHandle,
    ) -> Result<DbOp<'static>, sqlx::Error> {
        let tx = pool.begin().await?;

        // If an artificial clock is provided (and hasn't transitioned to realtime),
        // cache its time for consistent timestamps within the transaction.
        let time = clock.artificial_now();

        Ok(DbOp::new(tx, clock.clone(), time))
    }

    /// Transitions to a [`DbOpWithTime`] with the given time cached.
    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
        DbOpWithTime::new(self, time)
    }

    /// Transitions to a [`DbOpWithTime`] using the clock.
    ///
    /// Uses cached time if present, otherwise uses the clock's current time.
    pub fn with_clock_time(self) -> DbOpWithTime<'c> {
        let time = self.now.unwrap_or_else(|| self.clock.now());
        DbOpWithTime::new(self, time)
    }

    /// Transitions to a [`DbOpWithTime`] using the database time.
    ///
    /// Priority order:
    /// 1. Cached time if present
    /// 2. Artificial clock time if the clock is artificial (and hasn't transitioned)
    /// 3. Database time via `SELECT NOW()`
    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
        let time = if let Some(time) = self.now {
            time
        } else if let Some(artificial_time) = self.clock.artificial_now() {
            artificial_time
        } else {
            sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>("SELECT NOW()")
                .fetch_one(&mut *self.tx)
                .await?
        };

        Ok(DbOpWithTime::new(self, time))
    }

    /// Returns the optionally cached [`chrono::DateTime`]
    pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
        self.now
    }

    /// Begins a nested transaction.
    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
        Ok(DbOp::new(
            self.tx.begin().await?,
            self.clock.clone(),
            self.now,
        ))
    }

    /// Commits the inner transaction.
    pub async fn commit(mut self) -> Result<(), sqlx::Error> {
        let commit_hooks = self.commit_hooks.take().expect("no hooks");
        let post_hooks = commit_hooks.execute_pre(&mut self).await?;
        self.tx.commit().await?;
        post_hooks.execute();
        Ok(())
    }

    /// Gets a mutable handle to the inner transaction
    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
        &mut self.tx
    }
}

impl<'o> AtomicOperation for DbOp<'o> {
    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
        self.maybe_now()
    }

    fn clock(&self) -> &ClockHandle {
        &self.clock
    }

    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
        self.tx.as_executor()
    }

    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
        self.commit_hooks.as_mut().expect("no hooks").add(hook);
        Ok(())
    }
}

/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
///
/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
pub struct DbOpWithTime<'c> {
    inner: DbOp<'c>,
    now: chrono::DateTime<chrono::Utc>,
}

impl<'c> DbOpWithTime<'c> {
    fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
        inner.now = Some(time);
        Self { inner, now: time }
    }

    /// The cached [`chrono::DateTime`]
    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
        self.now
    }

    /// Begins a nested transaction.
    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
        Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
    }

    /// Commits the inner transaction.
    pub async fn commit(self) -> Result<(), sqlx::Error> {
        self.inner.commit().await
    }

    /// Gets a mutable handle to the inner transaction
    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
        self.inner.tx_mut()
    }
}

impl<'o> AtomicOperation for DbOpWithTime<'o> {
    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
        Some(self.now())
    }

    fn clock(&self) -> &ClockHandle {
        self.inner.clock()
    }

    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
        self.inner.as_executor()
    }

    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
        self.inner.add_commit_hook(hook)
    }
}

impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
    fn now(&self) -> chrono::DateTime<chrono::Utc> {
        self.now
    }
}

/// Trait to signify we can make multiple consistent database roundtrips.
///
/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
/// The reason for having a trait is to support custom types that wrap the inner
/// transaction while providing additional functionality.
///
/// See [`DbOp`] or [`DbOpWithTime`].
pub trait AtomicOperation: Send {
    /// Function for querying when the operation is taking place - if it is cached.
    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
        None
    }

    /// Returns the clock handle for time operations.
    ///
    /// Default implementation returns the global clock handle.
    fn clock(&self) -> &ClockHandle {
        crate::clock::Clock::handle()
    }

    /// Returns the [`sqlx::Executor`] implementation.
    /// The desired way to represent this would actually be as a GAT:
    /// ```rust
    /// trait AtomicOperation {
    ///     type Executor<'c>: sqlx::PgExecutor<'c>
    ///         where Self: 'c;
    ///
    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
    /// }
    /// ```
    ///
    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
    /// so we return the concrete [`&mut sqlx::PgConnection`](`sqlx::PgConnection`) instead as a work around.
    ///
    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
    /// there is no variance in the return type - so its fine.
    fn as_executor(&mut self) -> &mut sqlx::PgConnection;

    /// Registers a commit hook that will run pre_commit before and post_commit after the transaction commits.
    /// Returns Ok(()) if the hook was registered, Err(hook) if hooks are not supported.
    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
        Err(hook)
    }
}

impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
        &mut *self
    }
}