es_entity/operation/
mod.rs1pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, PgPool, Postgres, Transaction};
7
8pub use with_time::*;
9
10pub struct DbOp<'c> {
18 tx: Transaction<'c, Postgres>,
19 now: Option<chrono::DateTime<chrono::Utc>>,
20 commit_hooks: Option<hooks::CommitHooks>,
21}
22
23impl<'c> DbOp<'c> {
24 fn new(tx: Transaction<'c, Postgres>, time: Option<chrono::DateTime<chrono::Utc>>) -> Self {
25 Self {
26 tx,
27 now: time,
28 commit_hooks: Some(hooks::CommitHooks::new()),
29 }
30 }
31
32 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
35 let tx = pool.begin().await?;
36
37 #[cfg(feature = "sim-time")]
38 let time = Some(crate::prelude::sim_time::now());
39 #[cfg(not(feature = "sim-time"))]
40 let time = None;
41
42 Ok(DbOp::new(tx, time))
43 }
44
45 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
47 DbOpWithTime::new(self, time)
48 }
49
50 pub fn with_system_time(self) -> DbOpWithTime<'c> {
53 let time = if let Some(time) = self.now {
54 time
55 } else {
56 chrono::Utc::now()
57 };
58
59 DbOpWithTime::new(self, time)
60 }
61
62 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
68 let time = if let Some(time) = self.now {
69 time
70 } else {
71 let res = sqlx::query!("SELECT NOW()")
72 .fetch_one(&mut *self.tx)
73 .await?;
74 res.now.expect("could not fetch now")
75 };
76
77 Ok(DbOpWithTime::new(self, time))
78 }
79
80 pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
82 self.now
83 }
84
85 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
87 Ok(DbOp::new(self.tx.begin().await?, self.now))
88 }
89
90 pub async fn commit(mut self) -> Result<(), sqlx::Error> {
92 let commit_hooks = self.commit_hooks.take().expect("no hooks");
93 let post_hooks = commit_hooks.execute_pre(&mut self).await?;
94 self.tx.commit().await?;
95 post_hooks.execute();
96 Ok(())
97 }
98
99 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
101 &mut self.tx
102 }
103}
104
105impl<'o> AtomicOperation for DbOp<'o> {
106 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
107 self.maybe_now()
108 }
109
110 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
111 self.tx.as_executor()
112 }
113
114 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
115 self.commit_hooks.as_mut().expect("no hooks").add(hook);
116 Ok(())
117 }
118}
119
120impl<'c> From<Transaction<'c, Postgres>> for DbOp<'c> {
121 fn from(tx: Transaction<'c, Postgres>) -> Self {
122 Self::new(tx, None)
123 }
124}
125
126pub struct DbOpWithTime<'c> {
130 inner: DbOp<'c>,
131 now: chrono::DateTime<chrono::Utc>,
132}
133
134impl<'c> DbOpWithTime<'c> {
135 fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
136 inner.now = Some(time);
137 Self { inner, now: time }
138 }
139
140 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
142 self.now
143 }
144
145 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
147 Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
148 }
149
150 pub async fn commit(self) -> Result<(), sqlx::Error> {
152 self.inner.commit().await
153 }
154
155 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
157 self.inner.tx_mut()
158 }
159}
160
161impl<'o> AtomicOperation for DbOpWithTime<'o> {
162 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
163 Some(self.now())
164 }
165
166 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
167 self.inner.as_executor()
168 }
169
170 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
171 self.inner.add_commit_hook(hook)
172 }
173}
174
175impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
176 fn now(&self) -> chrono::DateTime<chrono::Utc> {
177 self.now
178 }
179}
180
181pub trait AtomicOperation: Send {
189 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
191 None
192 }
193
194 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
211
212 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
215 Err(hook)
216 }
217}
218
219impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
220 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
221 &mut *self
222 }
223}