es_entity/
operation.rs

1//! Handle execution of database operations and transactions.
2
3use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5/// Default return type of the derived EsRepo::begin_op().
6///
7/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
8/// transaction is taking place.
9///
10/// When `--feature sim-time` is active it will hold a time that will substitute `NOW()` in all
11/// write operations.
12pub struct DbOp<'c> {
13    tx: Transaction<'c, Postgres>,
14    now: Option<chrono::DateTime<chrono::Utc>>,
15}
16
17impl<'c> DbOp<'c> {
18    fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
19        Self { tx, now: time }
20    }
21
22    /// Initializes a transaction - defaults `now()` to `None` but will cache `sim_time::now()`
23    /// when `--feature sim-time` is active.
24    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
25        let tx = pool.begin().await?;
26
27        #[cfg(feature = "sim-time")]
28        let time = Some(crate::prelude::sim_time::now());
29        #[cfg(not(feature = "sim-time"))]
30        let time = None;
31
32        Ok(DbOp::new(tx, time))
33    }
34
35    /// Transitions to a [`DbOpWithTime`] with the given time cached.
36    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
37        DbOpWithTime::new(self.tx, time)
38    }
39
40    /// Transitions to a [`DbOpWithTime`] using [`chrono::Utc::now()`] to populate
41    /// (unless a time was already cached from sim-time).
42    pub fn with_system_time(self) -> DbOpWithTime<'c> {
43        let time = if let Some(time) = self.now {
44            time
45        } else {
46            chrono::Utc::now()
47        };
48
49        DbOpWithTime::new(self.tx, time)
50    }
51
52    /// Transitions to a [`DbOpWithTime`] using
53    /// ```sql
54    /// SELECT NOW()
55    /// ```
56    /// from the database (unless a time was already cached from sim-time).
57    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
58        let time = if let Some(time) = self.now {
59            time
60        } else {
61            let res = sqlx::query!("SELECT NOW()")
62                .fetch_one(&mut *self.tx)
63                .await?;
64            res.now.expect("could not fetch now")
65        };
66
67        Ok(DbOpWithTime::new(self.tx, time))
68    }
69
70    /// Returns the optionally cached [`chrono::DateTime`]
71    pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
72        self.now
73    }
74
75    /// Begins a nested transaction.
76    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
77        Ok(DbOp::new(self.tx.begin().await?, self.now))
78    }
79
80    /// Commits the inner transaction.
81    pub async fn commit(self) -> Result<(), sqlx::Error> {
82        self.tx.commit().await?;
83        Ok(())
84    }
85
86    /// Gets a mutable handle to the inner transaction
87    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
88        &mut self.tx
89    }
90}
91
92impl<'o> AtomicOperation for DbOp<'o> {
93    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
94        self.now()
95    }
96
97    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
98        self.tx.as_executor()
99    }
100}
101
102impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
103    fn from(tx: Transaction<'c, Postgres>) -> Self {
104        Self::new(tx, None)
105    }
106}
107
108impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
109    fn from(op: DbOp<'c>) -> Self {
110        op.tx
111    }
112}
113
114/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
115///
116/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
117pub struct DbOpWithTime<'c> {
118    tx: Transaction<'c, Postgres>,
119    now: chrono::DateTime<chrono::Utc>,
120}
121
122impl<'c> DbOpWithTime<'c> {
123    fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
124        Self { tx, now: time }
125    }
126
127    /// The cached [`chrono::DateTime`]
128    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
129        self.now
130    }
131
132    /// Begins a nested transaction.
133    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
134        Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
135    }
136
137    /// Commits the inner transaction.
138    pub async fn commit(self) -> Result<(), sqlx::Error> {
139        self.tx.commit().await?;
140        Ok(())
141    }
142
143    /// Gets a mutable handle to the inner transaction
144    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
145        &mut self.tx
146    }
147}
148
149impl<'o> AtomicOperation for DbOpWithTime<'o> {
150    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
151        Some(self.now())
152    }
153
154    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
155        self.tx.as_executor()
156    }
157}
158
159impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
160    fn from(op: DbOpWithTime<'c>) -> Self {
161        op.tx
162    }
163}
164
165/// Trait to signify we can make multiple consistent database roundtrips.
166///
167/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
168/// The reason for having a trait is to support custom types that wrap the inner
169/// transaction while providing additional functionality.
170///
171/// See [`DbOp`] or [`DbOpWithTime`].
172pub trait AtomicOperation: Send {
173    /// Function for querying when the operation is taking place - if it is cached.
174    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
175        None
176    }
177
178    /// Returns the [`sqlx::Executor`] implementation.
179    /// The desired way to represent this would actually be as a GAT:
180    /// ```rust
181    /// trait AtomicOperation {
182    ///     type Executor<'c>: sqlx::PgExecutor<'c>
183    ///         where Self: 'c;
184    ///
185    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
186    /// }
187    /// ```
188    ///
189    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
190    /// so we return the concrete [`&mut sqlx::PgConnection`](`sqlx::PgConnection`) instead as a work around.
191    ///
192    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
193    /// there is no variance in the return type - so its fine.
194    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
195}
196
197impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
198    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
199        &mut *self
200    }
201}