es_entity/operation/
mod.rs1pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, PgPool, Postgres, Transaction};
7
8use crate::clock::ClockHandle;
9
10pub use with_time::*;
11
12pub struct DbOp<'c> {
21 tx: Transaction<'c, Postgres>,
22 clock: ClockHandle,
23 now: Option<chrono::DateTime<chrono::Utc>>,
24 commit_hooks: Option<hooks::CommitHooks>,
25}
26
27impl<'c> DbOp<'c> {
28 fn new(
29 tx: Transaction<'c, Postgres>,
30 clock: ClockHandle,
31 time: Option<chrono::DateTime<chrono::Utc>>,
32 ) -> Self {
33 Self {
34 tx,
35 clock,
36 now: time,
37 commit_hooks: Some(hooks::CommitHooks::new()),
38 }
39 }
40
41 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
45 Self::init_with_clock(pool, crate::clock::Clock::handle()).await
46 }
47
48 pub async fn init_with_clock(
52 pool: &PgPool,
53 clock: &ClockHandle,
54 ) -> Result<DbOp<'static>, sqlx::Error> {
55 let tx = pool.begin().await?;
56
57 let time = clock.artificial_now();
60
61 Ok(DbOp::new(tx, clock.clone(), time))
62 }
63
64 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
66 DbOpWithTime::new(self, time)
67 }
68
69 pub fn with_clock_time(self) -> DbOpWithTime<'c> {
73 let time = self.now.unwrap_or_else(|| self.clock.now());
74 DbOpWithTime::new(self, time)
75 }
76
77 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
84 let time = if let Some(time) = self.now {
85 time
86 } else if let Some(artificial_time) = self.clock.artificial_now() {
87 artificial_time
88 } else {
89 let res = sqlx::query!("SELECT NOW()")
90 .fetch_one(&mut *self.tx)
91 .await?;
92 res.now.expect("could not fetch now")
93 };
94
95 Ok(DbOpWithTime::new(self, time))
96 }
97
98 pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
100 self.now
101 }
102
103 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
105 Ok(DbOp::new(
106 self.tx.begin().await?,
107 self.clock.clone(),
108 self.now,
109 ))
110 }
111
112 pub async fn commit(mut self) -> Result<(), sqlx::Error> {
114 let commit_hooks = self.commit_hooks.take().expect("no hooks");
115 let post_hooks = commit_hooks.execute_pre(&mut self).await?;
116 self.tx.commit().await?;
117 post_hooks.execute();
118 Ok(())
119 }
120
121 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
123 &mut self.tx
124 }
125}
126
127impl<'o> AtomicOperation for DbOp<'o> {
128 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
129 self.maybe_now()
130 }
131
132 fn clock(&self) -> &ClockHandle {
133 &self.clock
134 }
135
136 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
137 self.tx.as_executor()
138 }
139
140 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
141 self.commit_hooks.as_mut().expect("no hooks").add(hook);
142 Ok(())
143 }
144}
145
146pub struct DbOpWithTime<'c> {
150 inner: DbOp<'c>,
151 now: chrono::DateTime<chrono::Utc>,
152}
153
154impl<'c> DbOpWithTime<'c> {
155 fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
156 inner.now = Some(time);
157 Self { inner, now: time }
158 }
159
160 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
162 self.now
163 }
164
165 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
167 Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
168 }
169
170 pub async fn commit(self) -> Result<(), sqlx::Error> {
172 self.inner.commit().await
173 }
174
175 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
177 self.inner.tx_mut()
178 }
179}
180
181impl<'o> AtomicOperation for DbOpWithTime<'o> {
182 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
183 Some(self.now())
184 }
185
186 fn clock(&self) -> &ClockHandle {
187 self.inner.clock()
188 }
189
190 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
191 self.inner.as_executor()
192 }
193
194 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
195 self.inner.add_commit_hook(hook)
196 }
197}
198
199impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
200 fn now(&self) -> chrono::DateTime<chrono::Utc> {
201 self.now
202 }
203}
204
205pub trait AtomicOperation: Send {
213 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
215 None
216 }
217
218 fn clock(&self) -> &ClockHandle {
222 crate::clock::Clock::handle()
223 }
224
225 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
242
243 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
246 Err(hook)
247 }
248}
249
250impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
251 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
252 &mut *self
253 }
254}