1use sqlx::{Acquire, PgPool, Postgres, Transaction};
4
5pub struct DbOp<'c> {
6 tx: Transaction<'c, Postgres>,
7 now: Option<chrono::DateTime<chrono::Utc>>,
8}
9
10impl<'c> DbOp<'c> {
11 fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
12 Self { tx, now: time }
13 }
14
15 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
16 let tx = pool.begin().await?;
17
18 #[cfg(feature = "sim-time")]
19 let time = Some(sim_time::now());
20 #[cfg(not(feature = "sim-time"))]
21 let time = None;
22
23 Ok(DbOp::new(tx, time))
24 }
25
26 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
27 DbOpWithTime::new(self.tx, time)
28 }
29
30 pub fn with_system_time(self) -> DbOpWithTime<'c> {
31 #[cfg(feature = "sim-time")]
32 let time = sim_time::now();
33 #[cfg(not(feature = "sim-time"))]
34 let time = chrono::Utc::now();
35
36 DbOpWithTime::new(self.tx, time)
37 }
38
39 #[allow(unused_mut)]
40 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
41 #[cfg(feature = "sim-time")]
42 let time = sim_time::now();
43 #[cfg(not(feature = "sim-time"))]
44 let time = {
45 let res = sqlx::query!("SELECT NOW()")
46 .fetch_one(&mut *self.tx)
47 .await?;
48 res.now.expect("could not fetch now")
49 };
50
51 Ok(DbOpWithTime::new(self.tx, time))
52 }
53
54 pub fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
55 self.now
56 }
57
58 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
59 Ok(DbOp::new(self.tx.begin().await?, self.now))
60 }
61
62 pub async fn commit(self) -> Result<(), sqlx::Error> {
63 self.tx.commit().await?;
64 Ok(())
65 }
66
67 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
68 &mut self.tx
69 }
70}
71
72impl<'o> AtomicOperation for DbOp<'o> {
73 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
74 self.now()
75 }
76
77 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
78 self.tx.as_executor()
79 }
80}
81
82impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
83 fn from(tx: Transaction<'c, Postgres>) -> Self {
84 Self::new(tx, None)
85 }
86}
87
88impl<'c> From<DbOp<'c>> for Transaction<'c, Postgres> {
89 fn from(op: DbOp<'c>) -> Self {
90 op.tx
91 }
92}
93
94pub struct DbOpWithTime<'c> {
95 tx: Transaction<'c, Postgres>,
96 now: chrono::DateTime<chrono::Utc>,
97}
98
99impl<'c> DbOpWithTime<'c> {
100 fn new(tx: Transaction<'c, Postgres>, time: chrono::DateTime<chrono::Utc>) -> Self {
101 Self { tx, now: time }
102 }
103
104 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
105 self.now
106 }
107
108 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
109 Ok(DbOpWithTime::new(self.tx.begin().await?, self.now))
110 }
111
112 pub async fn commit(self) -> Result<(), sqlx::Error> {
113 self.tx.commit().await?;
114 Ok(())
115 }
116}
117
118impl<'o> AtomicOperation for DbOpWithTime<'o> {
119 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
120 Some(self.now())
121 }
122
123 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
124 self.tx.as_executor()
125 }
126}
127
128impl<'c> From<DbOpWithTime<'c>> for Transaction<'c, Postgres> {
129 fn from(op: DbOpWithTime<'c>) -> Self {
130 op.tx
131 }
132}
133
134pub trait AtomicOperation: Send {
135 fn now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
136 None
137 }
138
139 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
140}
141
142impl<'c> AtomicOperation for sqlx::PgTransaction<'c> {
143 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
144 &mut *self
145 }
146}