1use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
13 tx: Transaction<'c, Postgres>,
14 now: Option<chrono::DateTime<chrono::Utc>>,
15}
16
17impl<'c> DbOp<'c> {
18 fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
19 Self { tx, now: time }
20 }
21
22 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
25 let tx = pool.begin().await?;
26
27 #[cfg(feature = "sim-time")]
28 let time = Some(crate::prelude::sim_time::now());
29 #[cfg(not(feature = "sim-time"))]
30 let time = None;
31
32 Ok(DbOp::new(tx, time))
33 }
34
35 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
37 DbOpWithTime::new(self.tx, time)
38 }
39
40 pub fn with_system_time(self) -> DbOpWithTime<'c> {
43 let time = if let Some(time) = self.now {
44 time
45 } else {
46 chrono::Utc::now()
47 };
48
49 DbOpWithTime::new(self.tx, time)
50 }
51
52 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
58 let time = if let Some(time) = self.now {
59 time
60 } else {
61 let res = sqlx::query!("SELECT NOW()")
62 .fetch_one(&mut *self.tx)
63 .await?;
64 res.now.expect("could not fetch now")
65 };
66
67 Ok(DbOpWithTime::new(self.tx, time))
68 }
69
70 pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
72 self.now
73 }
74
75 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
77 Ok(DbOp::new(self.tx.begin().await?, self.now))
78 }
79
80 pub async fn commit(self) -> Result<(), sqlx::Error> {
82 self.tx.commit().await?;
83 Ok(())
84 }
85
86 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
88 &mut self.tx
89 }
90}
91
92impl<'o> AtomicOperation for DbOp<'o> {
93 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
94 self.now()
95 }
96
97 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
98 self.tx.as_executor()
99 }
100}
101
102impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
103 fn from(tx: Transaction<'c, Postgres>) -> Self {
104 Self::new(tx, None)
105 }
106}
107
108impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
109 fn from(op: DbOp<'c>) -> Self {
110 op.tx
111 }
112}
113
114pub struct DbOpWithTime<'c> {
118 tx: Transaction<'c, Postgres>,
119 now: chrono::DateTime<chrono::Utc>,
120}
121
122impl<'c> DbOpWithTime<'c> {
123 fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
124 Self { tx, now: time }
125 }
126
127 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
129 self.now
130 }
131
132 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
134 Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
135 }
136
137 pub async fn commit(self) -> Result<(), sqlx::Error> {
139 self.tx.commit().await?;
140 Ok(())
141 }
142
143 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
145 &mut self.tx
146 }
147}
148
149impl<'o> AtomicOperation for DbOpWithTime<'o> {
150 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
151 Some(self.now())
152 }
153
154 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
155 self.tx.as_executor()
156 }
157}
158
159impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
160 fn from(op: DbOpWithTime<'c>) -> Self {
161 op.tx
162 }
163}
164
165pub trait AtomicOperation: Send {
173 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
175 None
176 }
177
178 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
195}
196
197impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
198 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
199 &mut *self
200 }
201}