es_entity/
operation.rs

1//! Handle execution of database operations and transactions.
2
3use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
6    tx: Transaction<'c, Postgres>,
7    now: Option<chrono::DateTime<chrono::Utc>>,
8}
9
10impl<'c> DbOp<'c> {
11    fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
12        Self { tx, now: time }
13    }
14
15    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
16        let tx = pool.begin().await?;
17
18        #[cfg(feature = "sim-time")]
19        let time = Some(sim_time::now());
20        #[cfg(not(feature = "sim-time"))]
21        let time = None;
22
23        Ok(DbOp::new(tx, time))
24    }
25
26    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
27        DbOpWithTime::new(self.tx, time)
28    }
29
30    pub fn with_system_time(self) -> DbOpWithTime<'c> {
31        #[cfg(feature = "sim-time")]
32        let time = sim_time::now();
33        #[cfg(not(feature = "sim-time"))]
34        let time = chrono::Utc::now();
35
36        DbOpWithTime::new(self.tx, time)
37    }
38
39    #[allow(unused_mut)]
40    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
41        #[cfg(feature = "sim-time")]
42        let time = sim_time::now();
43        #[cfg(not(feature = "sim-time"))]
44        let time = {
45            let res = sqlx::query!("SELECT NOW()")
46                .fetch_one(&mut *self.tx)
47                .await?;
48            res.now.expect("could not fetch now")
49        };
50
51        Ok(DbOpWithTime::new(self.tx, time))
52    }
53
54    pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
55        self.now
56    }
57
58    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
59        Ok(DbOp::new(self.tx.begin().await?, self.now))
60    }
61
62    pub async fn commit(self) -> Result<(), sqlx::Error> {
63        self.tx.commit().await?;
64        Ok(())
65    }
66}
67
68impl<'o> AtomicOperation for DbOp<'o> {
69    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
70        self.now()
71    }
72
73    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
74        self.tx.as_executor()
75    }
76}
77
78impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
79    fn from(tx: Transaction<'c, Postgres>) -> Self {
80        Self::new(tx, None)
81    }
82}
83
84impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
85    fn from(op: DbOp<'c>) -> Self {
86        op.tx
87    }
88}
89
90pub struct DbOpWithTime<'c> {
91    tx: Transaction<'c, Postgres>,
92    now: chrono::DateTime<chrono::Utc>,
93}
94
95impl<'c> DbOpWithTime<'c> {
96    fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
97        Self { tx, now: time }
98    }
99
100    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
101        self.now
102    }
103
104    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
105        Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
106    }
107
108    pub async fn commit(self) -> Result<(), sqlx::Error> {
109        self.tx.commit().await?;
110        Ok(())
111    }
112}
113
114impl<'o> AtomicOperation for DbOpWithTime<'o> {
115    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
116        Some(self.now())
117    }
118
119    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
120        self.tx.as_executor()
121    }
122}
123
124impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
125    fn from(op: DbOpWithTime<'c>) -> Self {
126        op.tx
127    }
128}
129
130pub trait AtomicOperation: Send {
131    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
132        None
133    }
134
135    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
136}
137
138impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
139    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
140        &mut *self
141    }
142}