1use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
13 tx: Transaction<'c, Postgres>,
14 now: Option<chrono::DateTime<chrono::Utc>>,
15}
16
17impl<'c> DbOp<'c> {
18 fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
19 Self { tx, now: time }
20 }
21
22 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
25 let tx = pool.begin().await?;
26
27 #[cfg(feature = "sim-time")]
28 let time = Some(crate::prelude::sim_time::now());
29 #[cfg(not(feature = "sim-time"))]
30 let time = None;
31
32 Ok(DbOp::new(tx, time))
33 }
34
35 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
37 DbOpWithTime::new(self.tx, time)
38 }
39
40 pub fn with_system_time(self) -> DbOpWithTime<'c> {
43 let time = if let Some(time) = self.now {
44 time
45 } else {
46 chrono::Utc::now()
47 };
48
49 DbOpWithTime::new(self.tx, time)
50 }
51
52 #[allow(unused_mut)]
58 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
59 let time = if let Some(time) = self.now {
60 time
61 } else {
62 let res = sqlx::query!("SELECT NOW()")
63 .fetch_one(&mut *self.tx)
64 .await?;
65 res.now.expect("could not fetch now")
66 };
67
68 Ok(DbOpWithTime::new(self.tx, time))
69 }
70
71 pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
73 self.now
74 }
75
76 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
78 Ok(DbOp::new(self.tx.begin().await?, self.now))
79 }
80
81 pub async fn commit(self) -> Result<(), sqlx::Error> {
83 self.tx.commit().await?;
84 Ok(())
85 }
86
87 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
89 &mut self.tx
90 }
91}
92
93impl<'o> AtomicOperation for DbOp<'o> {
94 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
95 self.now()
96 }
97
98 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
99 self.tx.as_executor()
100 }
101}
102
103impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
104 fn from(tx: Transaction<'c, Postgres>) -> Self {
105 Self::new(tx, None)
106 }
107}
108
109impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
110 fn from(op: DbOp<'c>) -> Self {
111 op.tx
112 }
113}
114
115pub struct DbOpWithTime<'c> {
119 tx: Transaction<'c, Postgres>,
120 now: chrono::DateTime<chrono::Utc>,
121}
122
123impl<'c> DbOpWithTime<'c> {
124 fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
125 Self { tx, now: time }
126 }
127
128 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
130 self.now
131 }
132
133 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
135 Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
136 }
137
138 pub async fn commit(self) -> Result<(), sqlx::Error> {
140 self.tx.commit().await?;
141 Ok(())
142 }
143
144 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
146 &mut self.tx
147 }
148}
149
150impl<'o> AtomicOperation for DbOpWithTime<'o> {
151 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
152 Some(self.now())
153 }
154
155 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
156 self.tx.as_executor()
157 }
158}
159
160impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
161 fn from(op: DbOpWithTime<'c>) -> Self {
162 op.tx
163 }
164}
165
166pub trait AtomicOperation: Send {
174 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
176 None
177 }
178
179 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
196}
197
198impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
199 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
200 &mut *self
201 }
202}