pub mod hooks;
mod with_time;
use sqlx::{Acquire, PgPool, Postgres, Transaction};
use crate::clock::ClockHandle;
pub use with_time::*;
pub struct DbOp<'c> {
tx: Transaction<'c, Postgres>,
clock: ClockHandle,
now: Option<chrono::DateTime<chrono::Utc>>,
commit_hooks: Option<hooks::CommitHooks>,
}
impl<'c> DbOp<'c> {
fn new(
tx: Transaction<'c, Postgres>,
clock: ClockHandle,
time: Option<chrono::DateTime<chrono::Utc>>,
) -> Self {
Self {
tx,
clock,
now: time,
commit_hooks: Some(hooks::CommitHooks::new()),
}
}
pub async fn init(pool: &PgPool) -> Result<DbOp<'static>, sqlx::Error> {
Self::init_with_clock(pool, crate::clock::Clock::handle()).await
}
pub async fn init_with_clock(
pool: &PgPool,
clock: &ClockHandle,
) -> Result<DbOp<'static>, sqlx::Error> {
let tx = pool.begin().await?;
let time = clock.artificial_now();
Ok(DbOp::new(tx, clock.clone(), time))
}
pub fn with_time(self, time: chrono::DateTime<chrono::Utc>) -> DbOpWithTime<'c> {
DbOpWithTime::new(self, time)
}
pub fn with_clock_time(self) -> DbOpWithTime<'c> {
let time = self.now.unwrap_or_else(|| self.clock.now());
DbOpWithTime::new(self, time)
}
pub async fn with_db_time(mut self) -> Result<DbOpWithTime<'c>, sqlx::Error> {
let time = if let Some(time) = self.now {
time
} else if let Some(artificial_time) = self.clock.artificial_now() {
artificial_time
} else {
sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>("SELECT NOW()")
.fetch_one(&mut *self.tx)
.await?
};
Ok(DbOpWithTime::new(self, time))
}
pub fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.now
}
pub async fn begin(&mut self) -> Result<DbOp<'_>, sqlx::Error> {
Ok(DbOp::new(
self.tx.begin().await?,
self.clock.clone(),
self.now,
))
}
pub async fn commit(mut self) -> Result<(), sqlx::Error> {
let commit_hooks = self.commit_hooks.take().expect("no hooks");
let post_hooks = commit_hooks.execute_pre(&mut self).await?;
self.tx.commit().await?;
post_hooks.execute();
Ok(())
}
pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
&mut self.tx
}
}
impl<'o> AtomicOperation for DbOp<'o> {
fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.maybe_now()
}
fn clock(&self) -> &ClockHandle {
&self.clock
}
fn as_executor(&mut self) -> &mut sqlx::PgConnection {
self.tx.as_executor()
}
fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
self.commit_hooks.as_mut().expect("no hooks").add(hook);
Ok(())
}
}
pub struct DbOpWithTime<'c> {
inner: DbOp<'c>,
now: chrono::DateTime<chrono::Utc>,
}
impl<'c> DbOpWithTime<'c> {
fn new(mut inner: DbOp<'c>, time: chrono::DateTime<chrono::Utc>) -> Self {
inner.now = Some(time);
Self { inner, now: time }
}
pub fn now(&self) -> chrono::DateTime<chrono::Utc> {
self.now
}
pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error> {
Ok(DbOpWithTime::new(self.inner.begin().await?, self.now))
}
pub async fn commit(self) -> Result<(), sqlx::Error> {
self.inner.commit().await
}
pub fn tx_mut(&mut self) -> &mut Transaction<'c, Postgres> {
self.inner.tx_mut()
}
}
impl<'o> AtomicOperation for DbOpWithTime<'o> {
fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
Some(self.now())
}
fn clock(&self) -> &ClockHandle {
self.inner.clock()
}
fn as_executor(&mut self) -> &mut sqlx::PgConnection {
self.inner.as_executor()
}
fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
self.inner.add_commit_hook(hook)
}
}
impl<'o> AtomicOperationWithTime for DbOpWithTime<'o> {
fn now(&self) -> chrono::DateTime<chrono::Utc> {
self.now
}
}
pub trait AtomicOperation: Send {
fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
None
}
fn clock(&self) -> &ClockHandle {
crate::clock::Clock::handle()
}
fn as_executor(&mut self) -> &mut sqlx::PgConnection;
fn add_commit_hook<H: hooks::CommitHook>(&mut self, hook: H) -> Result<(), H> {
Err(hook)
}
}
impl<'c> AtomicOperation for sqlx::Transaction<'c, Postgres> {
fn as_executor(&mut self) -> &mut sqlx::PgConnection {
&mut *self
}
}