es_entity/operation/
mod.rs1pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, PgPool, Postgres, Transaction};
7
8use crate::clock::ClockHandle;
9
10pub use with_time::*;
11
12pub struct DbOp<'c> {
21 tx: Transaction<'c, Postgres>,
22 clock: ClockHandle,
23 now: Option<chrono::DateTime<chrono::Utc>>,
24 commit_hooks: Option<hooks::CommitHooks>,
25}
26
27impl<'c> DbOp<'c> {
28 fn new(
29 tx: Transaction<'c, Postgres>,
30 clock: ClockHandle,
31 time: Option<chrono::DateTime<chrono::Utc>>,
32 ) -> Self {
33 Self {
34 tx,
35 clock,
36 now: time,
37 commit_hooks: Some(hooks::CommitHooks::new()),
38 }
39 }
40
41 pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
45 Self::init_with_clock(pool, crate::clock::Clock::handle()).await
46 }
47
48 pub async fn init_with_clock(
52 pool: &PgPool,
53 clock: &ClockHandle,
54 ) -> Result<DbOp<'static>, sqlx::Error> {
55 let tx = pool.begin().await?;
56
57 let time = clock.is_artificial().then(|| clock.now());
59
60 Ok(DbOp::new(tx, clock.clone(), time))
61 }
62
63 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
65 DbOpWithTime::new(self, time)
66 }
67
68 pub fn with_clock_time(self) -> DbOpWithTime<'c> {
72 let time = self.now.unwrap_or_else(|| self.clock.now());
73 DbOpWithTime::new(self, time)
74 }
75
76 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
83 let time = if let Some(time) = self.now {
84 time
85 } else if self.clock.is_artificial() {
86 self.clock.now()
87 } else {
88 let res = sqlx::query!("SELECT NOW()")
89 .fetch_one(&mut *self.tx)
90 .await?;
91 res.now.expect("could not fetch now")
92 };
93
94 Ok(DbOpWithTime::new(self, time))
95 }
96
97 pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
99 self.now
100 }
101
102 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
104 Ok(DbOp::new(
105 self.tx.begin().await?,
106 self.clock.clone(),
107 self.now,
108 ))
109 }
110
111 pub async fn commit(mut self) -> Result<(), sqlx::Error> {
113 let commit_hooks = self.commit_hooks.take().expect("no hooks");
114 let post_hooks = commit_hooks.execute_pre(&mut self).await?;
115 self.tx.commit().await?;
116 post_hooks.execute();
117 Ok(())
118 }
119
120 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
122 &mut self.tx
123 }
124}
125
126impl<'o> AtomicOperation for DbOp<'o> {
127 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
128 self.maybe_now()
129 }
130
131 fn clock(&self) -> &ClockHandle {
132 &self.clock
133 }
134
135 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
136 self.tx.as_executor()
137 }
138
139 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
140 self.commit_hooks.as_mut().expect("no hooks").add(hook);
141 Ok(())
142 }
143}
144
145pub struct DbOpWithTime<'c> {
149 inner: DbOp<'c>,
150 now: chrono::DateTime<chrono::Utc>,
151}
152
153impl<'c> DbOpWithTime<'c> {
154 fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
155 inner.now = Some(time);
156 Self { inner, now: time }
157 }
158
159 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
161 self.now
162 }
163
164 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
166 Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
167 }
168
169 pub async fn commit(self) -> Result<(), sqlx::Error> {
171 self.inner.commit().await
172 }
173
174 pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
176 self.inner.tx_mut()
177 }
178}
179
180impl<'o> AtomicOperation for DbOpWithTime<'o> {
181 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
182 Some(self.now())
183 }
184
185 fn clock(&self) -> &ClockHandle {
186 self.inner.clock()
187 }
188
189 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
190 self.inner.as_executor()
191 }
192
193 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
194 self.inner.add_commit_hook(hook)
195 }
196}
197
198impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
199 fn now(&self) -> chrono::DateTime<chrono::Utc> {
200 self.now
201 }
202}
203
204pub trait AtomicOperation: Send {
212 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
214 None
215 }
216
217 fn clock(&self) -> &ClockHandle {
221 crate::clock::Clock::handle()
222 }
223
224 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
241
242 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
245 Err(hook)
246 }
247}
248
249impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
250 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
251 &mut *self
252 }
253}