es_entity/operation/
mod.rs

1//! Handle execution of database operations and transactions.
2
3pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, PgPool, Postgres, Transaction};
7
8pub use with_time::*;
9
10/// Default return type of the derived EsRepo::begin_op().
11///
12/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
13/// transaction is taking place.
14///
15/// When `--feature sim-time` is active it will hold a time that will substitute `NOW()` in all
16/// write operations.
17pub struct DbOp<'c> {
18    tx: Transaction<'c, Postgres>,
19    now: Option<chrono::DateTime<chrono::Utc>>,
20    commit_hooks: Option<hooks::CommitHooks>,
21}
22
23impl<'c> DbOp<'c> {
24    fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
25        Self {
26            tx,
27            now: time,
28            commit_hooks: Some(hooks::CommitHooks::new()),
29        }
30    }
31
32    /// Initializes a transaction - defaults `now()` to `None` but will cache `sim_time::now()`
33    /// when `--feature sim-time` is active.
34    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
35        let tx = pool.begin().await?;
36
37        #[cfg(feature = "sim-time")]
38        let time = Some(crate::prelude::sim_time::now());
39        #[cfg(not(feature = "sim-time"))]
40        let time = None;
41
42        Ok(DbOp::new(tx, time))
43    }
44
45    /// Transitions to a [`DbOpWithTime`] with the given time cached.
46    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
47        DbOpWithTime::new(self, time)
48    }
49
50    /// Transitions to a [`DbOpWithTime`] using [`chrono::Utc::now()`] to populate
51    /// (unless a time was already cached from sim-time).
52    pub fn with_system_time(self) -> DbOpWithTime<'c> {
53        let time = if let Some(time) = self.now {
54            time
55        } else {
56            chrono::Utc::now()
57        };
58
59        DbOpWithTime::new(self, time)
60    }
61
62    /// Transitions to a [`DbOpWithTime`] using
63    /// ```sql
64    /// SELECT NOW()
65    /// ```
66    /// from the database (unless a time was already cached from sim-time).
67    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
68        let time = if let Some(time) = self.now {
69            time
70        } else {
71            let res = sqlx::query!("SELECT NOW()")
72                .fetch_one(&mut *self.tx)
73                .await?;
74            res.now.expect("could not fetch now")
75        };
76
77        Ok(DbOpWithTime::new(self, time))
78    }
79
80    /// Returns the optionally cached [`chrono::DateTime`]
81    pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
82        self.now
83    }
84
85    /// Begins a nested transaction.
86    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
87        Ok(DbOp::new(self.tx.begin().await?, self.now))
88    }
89
90    /// Commits the inner transaction.
91    pub async fn commit(mut self) -> Result<(), sqlx::Error> {
92        let commit_hooks = self.commit_hooks.take().expect("no hooks");
93        let post_hooks = commit_hooks.execute_pre(&mut self).await?;
94        self.tx.commit().await?;
95        post_hooks.execute();
96        Ok(())
97    }
98
99    /// Gets a mutable handle to the inner transaction
100    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
101        &mut self.tx
102    }
103}
104
105impl<'o> AtomicOperation for DbOp<'o> {
106    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
107        self.maybe_now()
108    }
109
110    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
111        self.tx.as_executor()
112    }
113
114    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
115        self.commit_hooks.as_mut().expect("no hooks").add(hook);
116        Ok(())
117    }
118}
119
120impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
121    fn from(tx: Transaction<'c, Postgres>) -> Self {
122        Self::new(tx, None)
123    }
124}
125
126impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
127    fn from(op: DbOp<'c>) -> Self {
128        op.tx
129    }
130}
131
132/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
133///
134/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
135pub struct DbOpWithTime<'c> {
136    inner: DbOp<'c>,
137    now: chrono::DateTime<chrono::Utc>,
138}
139
140impl<'c> DbOpWithTime<'c> {
141    fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
142        inner.now = Some(time);
143        Self { inner, now: time }
144    }
145
146    /// The cached [`chrono::DateTime`]
147    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
148        self.now
149    }
150
151    /// Begins a nested transaction.
152    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
153        Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
154    }
155
156    /// Commits the inner transaction.
157    pub async fn commit(self) -> Result<(), sqlx::Error> {
158        self.inner.commit().await
159    }
160
161    /// Gets a mutable handle to the inner transaction
162    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
163        self.inner.tx_mut()
164    }
165}
166
167impl<'o> AtomicOperation for DbOpWithTime<'o> {
168    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
169        Some(self.now())
170    }
171
172    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
173        self.inner.as_executor()
174    }
175
176    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
177        self.inner.add_commit_hook(hook)
178    }
179}
180
181impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
182    fn now(&self) -> chrono::DateTime<chrono::Utc> {
183        self.now
184    }
185}
186
187impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
188    fn from(op: DbOpWithTime<'c>) -> Self {
189        op.inner.into()
190    }
191}
192
193/// Trait to signify we can make multiple consistent database roundtrips.
194///
195/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
196/// The reason for having a trait is to support custom types that wrap the inner
197/// transaction while providing additional functionality.
198///
199/// See [`DbOp`] or [`DbOpWithTime`].
200pub trait AtomicOperation: Send {
201    /// Function for querying when the operation is taking place - if it is cached.
202    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
203        None
204    }
205
206    /// Returns the [`sqlx::Executor`] implementation.
207    /// The desired way to represent this would actually be as a GAT:
208    /// ```rust
209    /// trait AtomicOperation {
210    ///     type Executor<'c>: sqlx::PgExecutor<'c>
211    ///         where Self: 'c;
212    ///
213    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
214    /// }
215    /// ```
216    ///
217    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
218    /// so we return the concrete [`&mut sqlx::PgConnection`](`sqlx::PgConnection`) instead as a work around.
219    ///
220    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
221    /// there is no variance in the return type - so its fine.
222    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
223
224    /// Registers a commit hook that will run pre_commit before and post_commit after the transaction commits.
225    /// Returns Ok(()) if the hook was registered, Err(hook) if hooks are not supported.
226    fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
227        Err(hook)
228    }
229}
230
231impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
232    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
233        &mut *self
234    }
235}