Skip to main content

es_entity/operation/
mod.rs

1//! Handle execution of database operations and transactions.
2
3pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, Transaction};
7
8use crate::{clock::ClockHandle, db};
9
10pub use with_time::*;
11
12/// Default return type of the derived EsRepo::begin_op().
13///
14/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
15/// transaction is taking place.
16///
17/// When a manual clock is provided, the transaction will automatically cache that
18/// clock's time, enabling deterministic testing. This cached time will be used in all
19/// time-dependent operations.
20pub struct DbOp<'c> {
21    tx: Transaction<'c, db::Db>,
22    clock: ClockHandle,
23    now: Option<chrono::DateTime<chrono::Utc>>,
24    commit_hooks: Option<hooks::CommitHooks>,
25}
26
27impl<'c> DbOp<'c> {
28    fn new(
29        tx: Transaction<'c, db::Db>,
30        clock: ClockHandle,
31        time: Option<chrono::DateTime<chrono::Utc>>,
32    ) -> Self {
33        Self {
34            tx,
35            clock,
36            now: time,
37            commit_hooks: Some(hooks::CommitHooks::new()),
38        }
39    }
40
41    /// Initializes a transaction using the global clock.
42    ///
43    /// Delegates to [`init_with_clock`](Self::init_with_clock) using the global clock handle.
44    pub async fn init(pool: &db::Pool) -> Result<DbOp<'static>, sqlx::Error> {
45        Self::init_with_clock(pool, crate::clock::Clock::handle()).await
46    }
47
48    /// Initializes a transaction with the specified clock.
49    ///
50    /// If the clock is manual, its current time will be cached in the transaction.
51    pub async fn init_with_clock(
52        pool: &db::Pool,
53        clock: &ClockHandle,
54    ) -> Result<DbOp<'static>, sqlx::Error> {
55        let tx = pool.begin().await?;
56
57        // If a manual clock is provided, cache its time for consistent
58        // timestamps within the transaction.
59        let time = clock.manual_now();
60
61        Ok(DbOp::new(tx, clock.clone(), time))
62    }
63
64    /// Transitions to a [`DbOpWithTime`] with the given time cached.
65    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
66        DbOpWithTime::new(self, time)
67    }
68
69    /// Transitions to a [`DbOpWithTime`] using the clock.
70    ///
71    /// Uses cached time if present, otherwise uses the clock's current time.
72    pub fn with_clock_time(self) -> DbOpWithTime<'c> {
73        let time = self.now.unwrap_or_else(|| self.clock.now());
74        DbOpWithTime::new(self, time)
75    }
76
77    /// Transitions to a [`DbOpWithTime`] using the database time.
78    ///
79    /// Priority order:
80    /// 1. Cached time if present
81    /// 2. Manual clock time if the clock is manual
82    /// 3. Database time via `SELECT NOW()`
83    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
84        let time = if let Some(time) = self.now {
85            time
86        } else if let Some(manual_time) = self.clock.manual_now() {
87            manual_time
88        } else {
89            db::database_now(&mut *self.tx).await?
90        };
91
92        Ok(DbOpWithTime::new(self, time))
93    }
94
95    /// Returns the optionally cached [`chrono::DateTime`]
96    pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
97        self.now
98    }
99
100    /// Begins a nested transaction.
101    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
102        Ok(DbOp::new(
103            self.tx.begin().await?,
104            self.clock.clone(),
105            self.now,
106        ))
107    }
108
109    /// Commits the inner transaction.
110    pub async fn commit(mut self) -> Result<(), sqlx::Error> {
111        let commit_hooks = self.commit_hooks.take().expect("no hooks");
112        let post_hooks = commit_hooks.execute_pre(&mut self).await?;
113        self.tx.commit().await?;
114        post_hooks.execute();
115        Ok(())
116    }
117
118    /// Gets a mutable handle to the inner transaction
119    pub fn tx_mut(&mut self) -> &mut Transaction<'c, db::Db> {
120        &mut self.tx
121    }
122}
123
124impl<'o> AtomicOperation for DbOp<'o> {
125    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
126        self.maybe_now()
127    }
128
129    fn clock(&self) -> &ClockHandle {
130        &self.clock
131    }
132
133    fn as_executor(&mut self) -> &mut db::Connection {
134        self.tx.as_executor()
135    }
136
137    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
138        self.commit_hooks.as_mut().expect("no hooks").add(hook);
139        Ok(())
140    }
141}
142
143/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
144///
145/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
146pub struct DbOpWithTime<'c> {
147    inner: DbOp<'c>,
148    now: chrono::DateTime<chrono::Utc>,
149}
150
151impl<'c> DbOpWithTime<'c> {
152    fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
153        inner.now = Some(time);
154        Self { inner, now: time }
155    }
156
157    /// The cached [`chrono::DateTime`]
158    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
159        self.now
160    }
161
162    /// Begins a nested transaction.
163    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
164        Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
165    }
166
167    /// Commits the inner transaction.
168    pub async fn commit(self) -> Result<(), sqlx::Error> {
169        self.inner.commit().await
170    }
171
172    /// Gets a mutable handle to the inner transaction
173    pub fn tx_mut(&mut self) -> &mut Transaction<'c, db::Db> {
174        self.inner.tx_mut()
175    }
176}
177
178impl<'o> AtomicOperation for DbOpWithTime<'o> {
179    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
180        Some(self.now())
181    }
182
183    fn clock(&self) -> &ClockHandle {
184        self.inner.clock()
185    }
186
187    fn as_executor(&mut self) -> &mut db::Connection {
188        self.inner.as_executor()
189    }
190
191    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
192        self.inner.add_commit_hook(hook)
193    }
194}
195
196impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
197    fn now(&self) -> chrono::DateTime<chrono::Utc> {
198        self.now
199    }
200}
201
202/// Trait to signify we can make multiple consistent database roundtrips.
203///
204/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
205/// The reason for having a trait is to support custom types that wrap the inner
206/// transaction while providing additional functionality.
207///
208/// See [`DbOp`] or [`DbOpWithTime`].
209pub trait AtomicOperation: Send {
210    /// Function for querying when the operation is taking place - if it is cached.
211    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
212        None
213    }
214
215    /// Returns the clock handle for time operations.
216    ///
217    /// Default implementation returns the global clock handle.
218    fn clock(&self) -> &ClockHandle {
219        crate::clock::Clock::handle()
220    }
221
222    /// Returns the [`sqlx::Executor`] implementation.
223    /// The desired way to represent this would actually be as a GAT:
224    /// ```rust
225    /// trait AtomicOperation {
226    ///     type Executor<'c>: sqlx::PgExecutor<'c>
227    ///         where Self: 'c;
228    ///
229    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
230    /// }
231    /// ```
232    ///
233    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
234    /// so we return the concrete [`&mut db::Connection`](`crate::db::Connection`) instead as a work around.
235    ///
236    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
237    /// there is no variance in the return type - so its fine.
238    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
239
240    /// Registers a commit hook that will run pre_commit before and post_commit after the transaction commits.
241    /// Returns Ok(()) if the hook was registered, Err(hook) if hooks are not supported.
242    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
243        Err(hook)
244    }
245}
246
247impl<'c> AtomicOperation for sqlx::Transaction<'c, db::Db> {
248    fn as_executor(&mut self) -> &mut db::Connection {
249        &mut *self
250    }
251}