es_entity/
operation.rs

1//! Handle execution of database operations and transactions.
2
3use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5/// Default return type of the derived EsRepo::begin_op().
6///
7/// Used as a wrapper of a [`sqlx::Transaction`] but can also cache the time at which the
8/// transaction is taking place.
9///
10/// When `--feature sim-time` is active it will hold a time that will substitute `NOW()` in all
11/// write operations.
12pub struct DbOp<'c> {
13    tx: Transaction<'c, Postgres>,
14    now: Option<chrono::DateTime<chrono::Utc>>,
15}
16
17impl<'c> DbOp<'c> {
18    fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
19        Self { tx, now: time }
20    }
21
22    /// Initializes a transaction - defaults `now()` to `None` but will cache `sim_time::now()`
23    /// when `--feature sim-time` is active.
24    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
25        let tx = pool.begin().await?;
26
27        #[cfg(feature = "sim-time")]
28        let time = Some(crate::prelude::sim_time::now());
29        #[cfg(not(feature = "sim-time"))]
30        let time = None;
31
32        Ok(DbOp::new(tx, time))
33    }
34
35    /// Transitions to a [`DbOpWithTime`] with the given time cached.
36    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
37        DbOpWithTime::new(self.tx, time)
38    }
39
40    /// Transitions to a [`DbOpWithTime`] using [`chrono::Utc::now()`] to populate
41    /// (unless a time was already cached from sim-time).
42    pub fn with_system_time(self) -> DbOpWithTime<'c> {
43        let time = if let Some(time) = self.now {
44            time
45        } else {
46            chrono::Utc::now()
47        };
48
49        DbOpWithTime::new(self.tx, time)
50    }
51
52    /// Transitions to a [`DbOpWithTime`] using
53    /// ```sql
54    /// SELECT NOW()
55    /// ```
56    /// from the database (unless a time was already cached from sim-time).
57    #[allow(unused_mut)]
58    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
59        let time = if let Some(time) = self.now {
60            time
61        } else {
62            let res = sqlx::query!("SELECT NOW()")
63                .fetch_one(&mut *self.tx)
64                .await?;
65            res.now.expect("could not fetch now")
66        };
67
68        Ok(DbOpWithTime::new(self.tx, time))
69    }
70
71    /// Returns the optionally cached [`chrono::DateTime`]
72    pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
73        self.now
74    }
75
76    /// Begins a nested transaction.
77    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
78        Ok(DbOp::new(self.tx.begin().await?, self.now))
79    }
80
81    /// Commits the inner transaction.
82    pub async fn commit(self) -> Result<(), sqlx::Error> {
83        self.tx.commit().await?;
84        Ok(())
85    }
86
87    /// Gets a mutable handle to the inner transaction
88    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
89        &mut self.tx
90    }
91}
92
93impl<'o> AtomicOperation for DbOp<'o> {
94    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
95        self.now()
96    }
97
98    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
99        self.tx.as_executor()
100    }
101}
102
103impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
104    fn from(tx: Transaction<'c, Postgres>) -> Self {
105        Self::new(tx, None)
106    }
107}
108
109impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
110    fn from(op: DbOp<'c>) -> Self {
111        op.tx
112    }
113}
114
115/// Equivileant of [`DbOp`] just that the time is guaranteed to be cached.
116///
117/// Used as a wrapper of a [`sqlx::Transaction`] with cached time of the transaction.
118pub struct DbOpWithTime<'c> {
119    tx: Transaction<'c, Postgres>,
120    now: chrono::DateTime<chrono::Utc>,
121}
122
123impl<'c> DbOpWithTime<'c> {
124    fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
125        Self { tx, now: time }
126    }
127
128    /// The cached [`chrono::DateTime`]
129    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
130        self.now
131    }
132
133    /// Begins a nested transaction.
134    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
135        Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
136    }
137
138    /// Commits the inner transaction.
139    pub async fn commit(self) -> Result<(), sqlx::Error> {
140        self.tx.commit().await?;
141        Ok(())
142    }
143
144    /// Gets a mutable handle to the inner transaction
145    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
146        &mut self.tx
147    }
148}
149
150impl<'o> AtomicOperation for DbOpWithTime<'o> {
151    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
152        Some(self.now())
153    }
154
155    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
156        self.tx.as_executor()
157    }
158}
159
160impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
161    fn from(op: DbOpWithTime<'c>) -> Self {
162        op.tx
163    }
164}
165
166/// Trait to signify we can make multiple consistent database roundtrips.
167///
168/// Its a stand in for [`&mut sqlx::Transaction<'_, DB>`](`sqlx::Transaction`).
169/// The reason for having a trait is to support custom types that wrap the inner
170/// transaction while providing additional functionality.
171///
172/// See [`DbOp`] or [`DbOpWithTime`].
173pub trait AtomicOperation: Send {
174    /// Function for querying when the operation is taking place - if it is cached.
175    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
176        None
177    }
178
179    /// Returns the [`sqlx::Executor`] implementation.
180    /// The desired way to represent this would actually be as a GAT:
181    /// ```rust
182    /// trait AtomicOperation {
183    ///     type Executor<'c>: sqlx::PgExecutor<'c>
184    ///         where Self: 'c;
185    ///
186    ///     fn as_executor<'c>(&'c mut self) -> Self::Executor<'c>;
187    /// }
188    /// ```
189    ///
190    /// But GATs don't play well with `async_trait::async_trait` due to lifetime constraints
191    /// so we return the concrete [`&mut sqlx::PgConnection`](`sqlx::PgConnection`) instead as a work around.
192    ///
193    /// Since this trait is generally applied to types that wrap a [`sqlx::Transaction`]
194    /// there is no variance in the return type - so its fine.
195    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
196}
197
198impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
199    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
200        &mut *self
201    }
202}