es_entity/operation/
mod.rs

1//! Handle execution of database operations and transactions.
2
3pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, PgPool, Postgres, Transaction};
7
8use crate::clock::ClockHandle;
9
10pub use with_time::*;
11
12/// Default return type of the derived EsRepo::begin_op().
13///
14/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
15/// transaction is taking place.
16///
17/// When an artificial clock is provided, the transaction will automatically cache that
18/// clock's time, enabling deterministic testing. This cached time will be used in all
19/// time-dependent operations.
20pub struct DbOp<'c> {
21    tx: Transaction<'c, Postgres>,
22    clock: ClockHandle,
23    now: Option<chrono::DateTime<chrono::Utc>>,
24    commit_hooks: Option<hooks::CommitHooks>,
25}
26
27impl<'c> DbOp<'c> {
28    fn new(
29        tx: Transaction<'c, Postgres>,
30        clock: ClockHandle,
31        time: Option<chrono::DateTime<chrono::Utc>>,
32    ) -> Self {
33        Self {
34            tx,
35            clock,
36            now: time,
37            commit_hooks: Some(hooks::CommitHooks::new()),
38        }
39    }
40
41    /// Initializes a transaction using the global clock.
42    ///
43    /// Delegates to [`init_with_clock`](Self::init_with_clock) using the global clock handle.
44    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
45        Self::init_with_clock(pool, crate::clock::Clock::handle()).await
46    }
47
48    /// Initializes a transaction with the specified clock.
49    ///
50    /// If the clock is artificial, its current time will be cached in the transaction.
51    pub async fn init_with_clock(
52        pool: &PgPool,
53        clock: &ClockHandle,
54    ) -> Result<DbOp<'static>, sqlx::Error> {
55        let tx = pool.begin().await?;
56
57        // If an artificial clock is provided, cache its time
58        let time = clock.is_artificial().then(|| clock.now());
59
60        Ok(DbOp::new(tx, clock.clone(), time))
61    }
62
63    /// Transitions to a [`DbOpWithTime`] with the given time cached.
64    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
65        DbOpWithTime::new(self, time)
66    }
67
68    /// Transitions to a [`DbOpWithTime`] using the clock.
69    ///
70    /// Uses cached time if present, otherwise uses the clock's current time.
71    pub fn with_clock_time(self) -> DbOpWithTime<'c> {
72        let time = self.now.unwrap_or_else(|| self.clock.now());
73        DbOpWithTime::new(self, time)
74    }
75
76    /// Transitions to a [`DbOpWithTime`] using the database time.
77    ///
78    /// Priority order:
79    /// 1. Cached time if present
80    /// 2. Artificial clock time if the clock is artificial
81    /// 3. Database time via `SELECT NOW()`
82    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
83        let time = if let Some(time) = self.now {
84            time
85        } else if self.clock.is_artificial() {
86            self.clock.now()
87        } else {
88            let res = sqlx::query!("SELECT NOW()")
89                .fetch_one(&mut *self.tx)
90                .await?;
91            res.now.expect("could not fetch now")
92        };
93
94        Ok(DbOpWithTime::new(self, time))
95    }
96
97    /// Returns the optionally cached [`chrono::DateTime`]
98    pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
99        self.now
100    }
101
102    /// Begins a nested transaction.
103    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
104        Ok(DbOp::new(
105            self.tx.begin().await?,
106            self.clock.clone(),
107            self.now,
108        ))
109    }
110
111    /// Commits the inner transaction.
112    pub async fn commit(mut self) -> Result<(), sqlx::Error> {
113        let commit_hooks = self.commit_hooks.take().expect("no hooks");
114        let post_hooks = commit_hooks.execute_pre(&mut self).await?;
115        self.tx.commit().await?;
116        post_hooks.execute();
117        Ok(())
118    }
119
120    /// Gets a mutable handle to the inner transaction
121    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
122        &mut self.tx
123    }
124}
125
126impl<'o> AtomicOperation for DbOp<'o> {
127    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
128        self.maybe_now()
129    }
130
131    fn clock(&self) -> &ClockHandle {
132        &self.clock
133    }
134
135    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
136        self.tx.as_executor()
137    }
138
139    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
140        self.commit_hooks.as_mut().expect("no hooks").add(hook);
141        Ok(())
142    }
143}
144
145/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
146///
147/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
148pub struct DbOpWithTime<'c> {
149    inner: DbOp<'c>,
150    now: chrono::DateTime<chrono::Utc>,
151}
152
153impl<'c> DbOpWithTime<'c> {
154    fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
155        inner.now = Some(time);
156        Self { inner, now: time }
157    }
158
159    /// The cached [`chrono::DateTime`]
160    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
161        self.now
162    }
163
164    /// Begins a nested transaction.
165    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
166        Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
167    }
168
169    /// Commits the inner transaction.
170    pub async fn commit(self) -> Result<(), sqlx::Error> {
171        self.inner.commit().await
172    }
173
174    /// Gets a mutable handle to the inner transaction
175    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
176        self.inner.tx_mut()
177    }
178}
179
180impl<'o> AtomicOperation for DbOpWithTime<'o> {
181    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
182        Some(self.now())
183    }
184
185    fn clock(&self) -> &ClockHandle {
186        self.inner.clock()
187    }
188
189    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
190        self.inner.as_executor()
191    }
192
193    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
194        self.inner.add_commit_hook(hook)
195    }
196}
197
198impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
199    fn now(&self) -> chrono::DateTime<chrono::Utc> {
200        self.now
201    }
202}
203
204/// Trait to signify we can make multiple consistent database roundtrips.
205///
206/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
207/// The reason for having a trait is to support custom types that wrap the inner
208/// transaction while providing additional functionality.
209///
210/// See [`DbOp`] or [`DbOpWithTime`].
211pub trait AtomicOperation: Send {
212    /// Function for querying when the operation is taking place - if it is cached.
213    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
214        None
215    }
216
217    /// Returns the clock handle for time operations.
218    ///
219    /// Default implementation returns the global clock handle.
220    fn clock(&self) -> &ClockHandle {
221        crate::clock::Clock::handle()
222    }
223
224    /// Returns the [`sqlx::Executor`] implementation.
225    /// The desired way to represent this would actually be as a GAT:
226    /// ```rust
227    /// trait AtomicOperation {
228    ///     type Executor<'c>: sqlx::PgExecutor<'c>
229    ///         where Self: 'c;
230    ///
231    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
232    /// }
233    /// ```
234    ///
235    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
236    /// so we return the concrete [`&mut sqlx::PgConnection`](`sqlx::PgConnection`) instead as a work around.
237    ///
238    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
239    /// there is no variance in the return type - so its fine.
240    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
241
242    /// Registers a commit hook that will run pre_commit before and post_commit after the transaction commits.
243    /// Returns Ok(()) if the hook was registered, Err(hook) if hooks are not supported.
244    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
245        Err(hook)
246    }
247}
248
249impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
250    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
251        &mut *self
252    }
253}