1use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
6 tx: Transaction<'c, Postgres>,
7 now: Option<chrono::DateTime<chrono::Utc>>,
8}
9
10impl<'c> DbOp<'c> {
11 fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
12 Self { tx, now: time }
13 }
14
15 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
16 let tx = pool.begin().await?;
17
18 #[cfg(feature = "sim-time")]
19 let time = Some(sim_time::now());
20 #[cfg(not(feature = "sim-time"))]
21 let time = None;
22
23 Ok(DbOp::new(tx, time))
24 }
25
26 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
27 DbOpWithTime::new(self.tx, time)
28 }
29
30 pub fn with_system_time(self) -> DbOpWithTime<'c> {
31 #[cfg(feature = "sim-time")]
32 let time = sim_time::now();
33 #[cfg(not(feature = "sim-time"))]
34 let time = chrono::Utc::now();
35
36 DbOpWithTime::new(self.tx, time)
37 }
38
39 #[allow(unused_mut)]
40 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
41 #[cfg(feature = "sim-time")]
42 let time = sim_time::now();
43 #[cfg(not(feature = "sim-time"))]
44 let time = {
45 let res = sqlx::query!("SELECT NOW()")
46 .fetch_one(&mut *self.tx)
47 .await?;
48 res.now.expect("could not fetch now")
49 };
50
51 Ok(DbOpWithTime::new(self.tx, time))
52 }
53
54 pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
55 self.now
56 }
57
58 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
59 Ok(DbOp::new(self.tx.begin().await?, self.now))
60 }
61
62 pub async fn commit(self) -> Result<(), sqlx::Error> {
63 self.tx.commit().await?;
64 Ok(())
65 }
66}
67
68impl<'o> AtomicOperation for DbOp<'o> {
69 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
70 self.now()
71 }
72
73 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
74 self.tx.as_executor()
75 }
76}
77
78impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
79 fn from(tx: Transaction<'c, Postgres>) -> Self {
80 Self::new(tx, None)
81 }
82}
83
84impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
85 fn from(op: DbOp<'c>) -> Self {
86 op.tx
87 }
88}
89
90pub struct DbOpWithTime<'c> {
91 tx: Transaction<'c, Postgres>,
92 now: chrono::DateTime<chrono::Utc>,
93}
94
95impl<'c> DbOpWithTime<'c> {
96 fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
97 Self { tx, now: time }
98 }
99
100 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
101 self.now
102 }
103
104 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
105 Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
106 }
107
108 pub async fn commit(self) -> Result<(), sqlx::Error> {
109 self.tx.commit().await?;
110 Ok(())
111 }
112}
113
114impl<'o> AtomicOperation for DbOpWithTime<'o> {
115 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
116 Some(self.now())
117 }
118
119 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
120 self.tx.as_executor()
121 }
122}
123
124impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
125 fn from(op: DbOpWithTime<'c>) -> Self {
126 op.tx
127 }
128}
129
130pub trait AtomicOperation: Send {
131 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
132 None
133 }
134
135 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
136}
137
138impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
139 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
140 &mut *self
141 }
142}