es_entity/
operation.rs

1//! Handle execution of database operations and transactions.
2
3use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
6    tx: Transaction<'c, Postgres>,
7    now: Option<chrono::DateTime<chrono::Utc>>,
8}
9
10impl<'c> DbOp<'c> {
11    fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
12        Self { tx, now: time }
13    }
14
15    pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
16        let tx = pool.begin().await?;
17
18        #[cfg(feature = "sim-time")]
19        let time = Some(sim_time::now());
20        #[cfg(not(feature = "sim-time"))]
21        let time = None;
22
23        Ok(DbOp::new(tx, time))
24    }
25
26    pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
27        DbOpWithTime::new(self.tx, time)
28    }
29
30    pub fn with_system_time(self) -> DbOpWithTime<'c> {
31        #[cfg(feature = "sim-time")]
32        let time = sim_time::now();
33        #[cfg(not(feature = "sim-time"))]
34        let time = chrono::Utc::now();
35
36        DbOpWithTime::new(self.tx, time)
37    }
38
39    #[allow(unused_mut)]
40    pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
41        #[cfg(feature = "sim-time")]
42        let time = sim_time::now();
43        #[cfg(not(feature = "sim-time"))]
44        let time = {
45            let res = sqlx::query!("SELECT NOW()")
46                .fetch_one(&mut *self.tx)
47                .await?;
48            res.now.expect("could not fetch now")
49        };
50
51        Ok(DbOpWithTime::new(self.tx, time))
52    }
53
54    pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
55        self.now
56    }
57
58    pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
59        Ok(DbOp::new(self.tx.begin().await?, self.now))
60    }
61
62    pub async fn commit(self) -> Result<(), sqlx::Error> {
63        self.tx.commit().await?;
64        Ok(())
65    }
66
67    pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
68        &mut self.tx
69    }
70}
71
72impl<'o> AtomicOperation for DbOp<'o> {
73    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
74        self.now()
75    }
76
77    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
78        self.tx.as_executor()
79    }
80}
81
82impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
83    fn from(tx: Transaction<'c, Postgres>) -> Self {
84        Self::new(tx, None)
85    }
86}
87
88impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
89    fn from(op: DbOp<'c>) -> Self {
90        op.tx
91    }
92}
93
94pub struct DbOpWithTime<'c> {
95    tx: Transaction<'c, Postgres>,
96    now: chrono::DateTime<chrono::Utc>,
97}
98
99impl<'c> DbOpWithTime<'c> {
100    fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
101        Self { tx, now: time }
102    }
103
104    pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
105        self.now
106    }
107
108    pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
109        Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
110    }
111
112    pub async fn commit(self) -> Result<(), sqlx::Error> {
113        self.tx.commit().await?;
114        Ok(())
115    }
116}
117
118impl<'o> AtomicOperation for DbOpWithTime<'o> {
119    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
120        Some(self.now())
121    }
122
123    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
124        self.tx.as_executor()
125    }
126}
127
128impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
129    fn from(op: DbOpWithTime<'c>) -> Self {
130        op.tx
131    }
132}
133
134pub trait AtomicOperation: Send {
135    fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
136        None
137    }
138
139    fn as_executor(&mut self) -> &mut sqlx::PgConnection;
140}
141
142impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
143    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
144        &mut *self
145    }
146}