bsql_core/transaction.rs
1//! Database transactions with commit/rollback and drop-guard semantics.
2//!
3//! Created via [`Pool::begin()`](crate::pool::Pool::begin). A transaction
4//! holds a single connection from the pool for its entire lifetime. Queries
5//! executed through the `Executor` trait run within the transaction.
6//!
7//! # Drop behavior
8//!
9//! If a `Transaction` is dropped without calling [`commit()`](Transaction::commit)
10//! or [`rollback()`](Transaction::rollback), the connection is permanently detached
11//! from the pool via `Object::take()` and closed. A warning is logged to stderr.
12//! `Drop` is synchronous and cannot send an async `ROLLBACK`, so the connection
13//! must be discarded to prevent reuse in a dirty state.
14//!
15//! Always call `commit()` or `rollback()` explicitly.
16
17use std::fmt;
18
19use crate::error::{BsqlError, BsqlResult};
20use crate::pool::PoolConnection;
21
22/// A database transaction.
23///
24/// Created by [`Pool::begin()`](crate::pool::Pool::begin). Must be explicitly
25/// committed via [`commit()`](Transaction::commit). If dropped without
26/// `commit()`, the connection is discarded from the pool.
27pub struct Transaction {
28 /// `None` after `commit()` or `rollback()` consumes the connection.
29 /// Since both methods take `self`, user code cannot observe `None` —
30 /// this is only `None` during `Drop` after a successful commit.
31 conn: Option<PoolConnection>,
32 committed: bool,
33}
34
35impl Transaction {
36 /// Create a new transaction. Called by `Pool::begin()`.
37 pub(crate) fn new(conn: PoolConnection) -> Self {
38 Self {
39 conn: Some(conn),
40 committed: false,
41 }
42 }
43
44 /// Commit the transaction and return the connection to the pool.
45 ///
46 /// Consumes `self` — the transaction cannot be used after commit.
47 pub async fn commit(mut self) -> BsqlResult<()> {
48 let conn = self
49 .conn
50 .as_ref()
51 .expect("bsql bug: Transaction::commit called but connection already taken");
52 match conn.inner.batch_execute("COMMIT").await {
53 Ok(()) => {
54 self.committed = true;
55 // conn drops with self, returning to pool (clean after COMMIT)
56 Ok(())
57 }
58 Err(e) => {
59 // COMMIT failed — connection is dirty (aborted transaction).
60 // Detach it from the pool so nobody else gets it.
61 if let Some(conn) = self.conn.take() {
62 let _ = deadpool_postgres::Object::take(conn.inner);
63 }
64 self.committed = true; // suppress Drop warning — we handled it
65 Err(BsqlError::from(e))
66 }
67 }
68 }
69
70 /// Explicitly roll back the transaction and return the connection to the pool.
71 ///
72 /// Consumes `self` — the transaction cannot be used after rollback.
73 pub async fn rollback(mut self) -> BsqlResult<()> {
74 let conn = self
75 .conn
76 .as_ref()
77 .expect("bsql bug: Transaction::rollback called but connection already taken");
78 match conn.inner.batch_execute("ROLLBACK").await {
79 Ok(()) => {
80 self.committed = true; // suppress Drop warning — rollback is intentional
81 // conn drops with self, returning to pool (clean after ROLLBACK)
82 Ok(())
83 }
84 Err(e) => {
85 // ROLLBACK failed — connection is broken. Detach from pool.
86 if let Some(conn) = self.conn.take() {
87 let _ = deadpool_postgres::Object::take(conn.inner);
88 }
89 self.committed = true;
90 Err(BsqlError::from(e))
91 }
92 }
93 }
94
95 /// Access the inner connection for `Executor` implementation.
96 pub(crate) fn connection(&self) -> &PoolConnection {
97 self.conn
98 .as_ref()
99 .expect("bsql bug: Transaction used after commit/rollback")
100 }
101}
102
103impl fmt::Debug for Transaction {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("Transaction")
106 .field("active", &self.conn.is_some())
107 .field("committed", &self.committed)
108 .finish()
109 }
110}
111
112impl Drop for Transaction {
113 fn drop(&mut self) {
114 if !self.committed {
115 if let Some(conn) = self.conn.take() {
116 // Connection has an uncommitted transaction. We cannot send
117 // ROLLBACK because Drop is synchronous and ROLLBACK is async.
118 //
119 // Detach the connection from the pool permanently via
120 // Object::take(). This prevents the dirty connection from
121 // being handed to the next caller. RecyclingMethod::Fast
122 // does NOT run a health-check query, so without this the
123 // connection would be reused in an aborted-transaction state.
124 //
125 // The returned ClientWrapper drops here, closing the TCP
126 // connection. The pool slot is freed and a fresh connection
127 // will be created on the next acquire().
128 let _ = deadpool_postgres::Object::take(conn.inner);
129 eprintln!(
130 "bsql: transaction dropped without commit() or rollback() \
131 — connection discarded from pool"
132 );
133 }
134 }
135 }
136}