use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::error::{BsqlError, BsqlResult};
use crate::pool::PoolConnection;
pub struct Transaction {
conn: Option<PoolConnection>,
committed: bool,
begun: AtomicBool,
}
impl Transaction {
pub(crate) fn new(conn: PoolConnection) -> Self {
Self {
conn: Some(conn),
committed: false,
begun: AtomicBool::new(false),
}
}
pub(crate) async fn ensure_begun(&self) -> BsqlResult<()> {
if !self.begun.load(Ordering::Relaxed) {
self.conn
.as_ref()
.expect("bsql bug: Transaction used after commit/rollback")
.inner
.batch_execute("BEGIN")
.await
.map_err(BsqlError::from)?;
self.begun.store(true, Ordering::Relaxed);
}
Ok(())
}
pub async fn commit(mut self) -> BsqlResult<()> {
if !self.begun.load(Ordering::Relaxed) {
self.committed = true;
return Ok(());
}
let conn = self
.conn
.as_ref()
.expect("bsql bug: Transaction::commit called but connection already taken");
match conn.inner.batch_execute("COMMIT").await {
Ok(()) => {
self.committed = true;
Ok(())
}
Err(e) => {
if let Some(conn) = self.conn.take() {
let _ = deadpool_postgres::Object::take(conn.inner);
}
self.committed = true; Err(BsqlError::from(e))
}
}
}
pub async fn rollback(mut self) -> BsqlResult<()> {
if !self.begun.load(Ordering::Relaxed) {
self.committed = true;
return Ok(());
}
let conn = self
.conn
.as_ref()
.expect("bsql bug: Transaction::rollback called but connection already taken");
match conn.inner.batch_execute("ROLLBACK").await {
Ok(()) => {
self.committed = true; Ok(())
}
Err(e) => {
if let Some(conn) = self.conn.take() {
let _ = deadpool_postgres::Object::take(conn.inner);
}
self.committed = true;
Err(BsqlError::from(e))
}
}
}
pub(crate) fn connection(&self) -> &PoolConnection {
self.conn
.as_ref()
.expect("bsql bug: Transaction used after commit/rollback")
}
}
impl fmt::Debug for Transaction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Transaction")
.field("active", &self.conn.is_some())
.field("committed", &self.committed)
.field("begun", &self.begun.load(Ordering::Relaxed))
.finish()
}
}
impl Drop for Transaction {
fn drop(&mut self) {
if !self.committed {
if !self.begun.load(Ordering::Relaxed) {
return;
}
if let Some(conn) = self.conn.take() {
let _ = deadpool_postgres::Object::take(conn.inner);
#[cfg(debug_assertions)]
eprintln!(
"bsql: transaction dropped without commit() or rollback() \
— connection discarded from pool"
);
}
}
}
}