es_entity/operation/
mod.rs1pub mod hooks;
4mod with_time;
5
6use sqlx::{Acquire, Transaction};
7
8use crate::{clock::ClockHandle, db};
9
10pub use with_time::*;
11
12pub struct DbOp<'c> {
21 tx: Transaction<'c, db::Db>,
22 clock: ClockHandle,
23 now: Option<chrono::DateTime<chrono::Utc>>,
24 commit_hooks: Option<hooks::CommitHooks>,
25}
26
27impl<'c> DbOp<'c> {
28 fn new(
29 tx: Transaction<'c, db::Db>,
30 clock: ClockHandle,
31 time: Option<chrono::DateTime<chrono::Utc>>,
32 ) -> Self {
33 Self {
34 tx,
35 clock,
36 now: time,
37 commit_hooks: Some(hooks::CommitHooks::new()),
38 }
39 }
40
41 pub async fn init(pool: &db::Pool) -> Result<DbOp<'static>, sqlx::Error> {
45 Self::init_with_clock(pool, crate::clock::Clock::handle()).await
46 }
47
48 pub async fn init_with_clock(
52 pool: &db::Pool,
53 clock: &ClockHandle,
54 ) -> Result<DbOp<'static>, sqlx::Error> {
55 let tx = pool.begin().await?;
56
57 let time = clock.manual_now();
60
61 Ok(DbOp::new(tx, clock.clone(), time))
62 }
63
64 pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
66 DbOpWithTime::new(self, time)
67 }
68
69 pub fn with_clock_time(self) -> DbOpWithTime<'c> {
73 let time = self.now.unwrap_or_else(|| self.clock.now());
74 DbOpWithTime::new(self, time)
75 }
76
77 pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
84 let time = if let Some(time) = self.now {
85 time
86 } else if let Some(manual_time) = self.clock.manual_now() {
87 manual_time
88 } else {
89 db::database_now(&mut *self.tx).await?
90 };
91
92 Ok(DbOpWithTime::new(self, time))
93 }
94
95 pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
97 self.now
98 }
99
100 pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
102 Ok(DbOp::new(
103 self.tx.begin().await?,
104 self.clock.clone(),
105 self.now,
106 ))
107 }
108
109 pub async fn commit(mut self) -> Result<(), sqlx::Error> {
111 let commit_hooks = self.commit_hooks.take().expect("no hooks");
112 let post_hooks = commit_hooks.execute_pre(&mut self).await?;
113 self.tx.commit().await?;
114 post_hooks.execute();
115 Ok(())
116 }
117
118 pub fn tx_mut(&mut self) -> &mut Transaction<'c, db::Db> {
120 &mut self.tx
121 }
122}
123
124impl<'o> AtomicOperation for DbOp<'o> {
125 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
126 self.maybe_now()
127 }
128
129 fn clock(&self) -> &ClockHandle {
130 &self.clock
131 }
132
133 fn as_executor(&mut self) -> &mut db::Connection {
134 self.tx.as_executor()
135 }
136
137 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
138 self.commit_hooks.as_mut().expect("no hooks").add(hook);
139 Ok(())
140 }
141}
142
143pub struct DbOpWithTime<'c> {
147 inner: DbOp<'c>,
148 now: chrono::DateTime<chrono::Utc>,
149}
150
151impl<'c> DbOpWithTime<'c> {
152 fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
153 inner.now = Some(time);
154 Self { inner, now: time }
155 }
156
157 pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
159 self.now
160 }
161
162 pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
164 Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
165 }
166
167 pub async fn commit(self) -> Result<(), sqlx::Error> {
169 self.inner.commit().await
170 }
171
172 pub fn tx_mut(&mut self) -> &mut Transaction<'c, db::Db> {
174 self.inner.tx_mut()
175 }
176}
177
178impl<'o> AtomicOperation for DbOpWithTime<'o> {
179 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
180 Some(self.now())
181 }
182
183 fn clock(&self) -> &ClockHandle {
184 self.inner.clock()
185 }
186
187 fn as_executor(&mut self) -> &mut db::Connection {
188 self.inner.as_executor()
189 }
190
191 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
192 self.inner.add_commit_hook(hook)
193 }
194}
195
196impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
197 fn now(&self) -> chrono::DateTime<chrono::Utc> {
198 self.now
199 }
200}
201
202pub trait AtomicOperation: Send {
210 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
212 None
213 }
214
215 fn clock(&self) -> &ClockHandle {
219 crate::clock::Clock::handle()
220 }
221
222 fn as_executor(&mut self) -> &mut sqlx::PgConnection;
239
240 fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
243 Err(hook)
244 }
245}
246
247impl<'c> AtomicOperation for sqlx::Transaction<'c, db::Db> {
248 fn as_executor(&mut self) -> &mut db::Connection {
249 &mut *self
250 }
251}