bsql_core/transaction.rs
1//! Database transactions with commit/rollback, drop-guard, and lazy-BEGIN.
2//!
3//! Created via [`Pool::begin()`](crate::pool::Pool::begin). A transaction
4//! holds a single connection from the pool for its entire lifetime. Queries
5//! executed through the `Executor` trait run within the transaction.
6//!
7//! # Lazy BEGIN
8//!
9//! `Pool::begin()` acquires a connection but does NOT send `BEGIN` to
10//! PostgreSQL. The `BEGIN` is sent lazily on the first query inside the
11//! transaction (via `ensure_begun`). This saves one PG round-trip per
12//! transaction, which adds up under high throughput.
13//!
14//! The lazy approach is transparent to callers: PostgreSQL guarantees
15//! read-committed isolation within a transaction regardless of when
16//! `BEGIN` is issued relative to the first statement.
17//!
18//! If a `Transaction` is created and then committed or dropped without
19//! executing any queries, no `BEGIN`/`COMMIT`/`ROLLBACK` is sent at all.
20//! The connection returns to the pool cleanly.
21//!
22//! # Drop behavior
23//!
24//! If a `Transaction` is dropped without calling [`commit()`](Transaction::commit)
25//! or [`rollback()`](Transaction::rollback):
26//!
27//! - **If `BEGIN` was never sent** (no queries executed): the connection is
28//! clean and returns to the pool normally.
29//! - **If `BEGIN` was sent**: the connection is dirty. It is permanently
30//! detached from the pool via `Object::take()` and closed. `Drop` is
31//! synchronous and cannot send an async `ROLLBACK`, so the connection
32//! must be discarded to prevent reuse in an aborted-transaction state.
33//!
34//! Always call `commit()` or `rollback()` explicitly.
35
36use std::fmt;
37use std::sync::atomic::{AtomicBool, Ordering};
38
39use crate::error::{BsqlError, BsqlResult};
40use crate::pool::PoolConnection;
41
42/// A database transaction.
43///
44/// Created by [`Pool::begin()`](crate::pool::Pool::begin). Must be explicitly
45/// committed via [`commit()`](Transaction::commit). If dropped without
46/// `commit()`, the connection is discarded from the pool (unless no queries
47/// were executed, in which case `BEGIN` was never sent and the connection
48/// is clean).
49pub struct Transaction {
50 /// `None` after `commit()` or `rollback()` consumes the connection.
51 /// Since both methods take `self`, user code cannot observe `None` —
52 /// this is only `None` during `Drop` after a successful commit.
53 conn: Option<PoolConnection>,
54 committed: bool,
55 /// Whether `BEGIN` has been sent to PostgreSQL. `AtomicBool` provides
56 /// interior mutability for `&self` methods (the Executor trait takes
57 /// `&self`) while keeping Transaction `Send + Sync`.
58 ///
59 /// Relaxed ordering suffices: a Transaction is never shared between
60 /// tasks (PG connections are not multiplexed), so the only observer
61 /// of this flag is the single task that owns the transaction. The
62 /// atomic is used solely for interior mutability through `&self`.
63 begun: AtomicBool,
64}
65
66impl Transaction {
67 /// Create a new transaction. Called by `Pool::begin()`.
68 ///
69 /// Does NOT send `BEGIN` — that is deferred to the first query
70 /// via [`ensure_begun`](Transaction::ensure_begun).
71 pub(crate) fn new(conn: PoolConnection) -> Self {
72 Self {
73 conn: Some(conn),
74 committed: false,
75 begun: AtomicBool::new(false),
76 }
77 }
78
79 /// Send `BEGIN` to PostgreSQL if not already sent.
80 ///
81 /// Called at the start of every `Executor` method. The first call
82 /// sends `BEGIN`; subsequent calls are a no-op (relaxed atomic load).
83 pub(crate) async fn ensure_begun(&self) -> BsqlResult<()> {
84 if !self.begun.load(Ordering::Relaxed) {
85 self.conn
86 .as_ref()
87 .expect("bsql bug: Transaction used after commit/rollback")
88 .inner
89 .batch_execute("BEGIN")
90 .await
91 .map_err(BsqlError::from)?;
92 self.begun.store(true, Ordering::Relaxed);
93 }
94 Ok(())
95 }
96
97 /// Commit the transaction and return the connection to the pool.
98 ///
99 /// Consumes `self` — the transaction cannot be used after commit.
100 ///
101 /// If no queries were executed (`BEGIN` was never sent), this is a
102 /// no-op: no `COMMIT` is sent and the connection returns cleanly.
103 pub async fn commit(mut self) -> BsqlResult<()> {
104 if !self.begun.load(Ordering::Relaxed) {
105 // BEGIN was never sent — nothing to commit.
106 // Connection is clean; let it return to the pool via Drop.
107 self.committed = true;
108 return Ok(());
109 }
110
111 let conn = self
112 .conn
113 .as_ref()
114 .expect("bsql bug: Transaction::commit called but connection already taken");
115 match conn.inner.batch_execute("COMMIT").await {
116 Ok(()) => {
117 self.committed = true;
118 // conn drops with self, returning to pool (clean after COMMIT)
119 Ok(())
120 }
121 Err(e) => {
122 // COMMIT failed — connection is dirty (aborted transaction).
123 // Detach it from the pool so nobody else gets it.
124 if let Some(conn) = self.conn.take() {
125 let _ = deadpool_postgres::Object::take(conn.inner);
126 }
127 self.committed = true; // suppress Drop warning — we handled it
128 Err(BsqlError::from(e))
129 }
130 }
131 }
132
133 /// Explicitly roll back the transaction and return the connection to the pool.
134 ///
135 /// Consumes `self` — the transaction cannot be used after rollback.
136 ///
137 /// If no queries were executed (`BEGIN` was never sent), this is a
138 /// no-op: no `ROLLBACK` is sent and the connection returns cleanly.
139 pub async fn rollback(mut self) -> BsqlResult<()> {
140 if !self.begun.load(Ordering::Relaxed) {
141 // BEGIN was never sent — nothing to roll back.
142 // Connection is clean; let it return to the pool via Drop.
143 self.committed = true;
144 return Ok(());
145 }
146
147 let conn = self
148 .conn
149 .as_ref()
150 .expect("bsql bug: Transaction::rollback called but connection already taken");
151 match conn.inner.batch_execute("ROLLBACK").await {
152 Ok(()) => {
153 self.committed = true; // suppress Drop warning — rollback is intentional
154 // conn drops with self, returning to pool (clean after ROLLBACK)
155 Ok(())
156 }
157 Err(e) => {
158 // ROLLBACK failed — connection is broken. Detach from pool.
159 if let Some(conn) = self.conn.take() {
160 let _ = deadpool_postgres::Object::take(conn.inner);
161 }
162 self.committed = true;
163 Err(BsqlError::from(e))
164 }
165 }
166 }
167
168 /// Access the inner connection for `Executor` implementation.
169 pub(crate) fn connection(&self) -> &PoolConnection {
170 self.conn
171 .as_ref()
172 .expect("bsql bug: Transaction used after commit/rollback")
173 }
174}
175
176impl fmt::Debug for Transaction {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("Transaction")
179 .field("active", &self.conn.is_some())
180 .field("committed", &self.committed)
181 .field("begun", &self.begun.load(Ordering::Relaxed))
182 .finish()
183 }
184}
185
186impl Drop for Transaction {
187 fn drop(&mut self) {
188 if !self.committed {
189 if !self.begun.load(Ordering::Relaxed) {
190 // BEGIN was never sent — connection is clean.
191 // Let it return to the pool normally (conn drops with self).
192 return;
193 }
194
195 if let Some(conn) = self.conn.take() {
196 // Connection has an uncommitted transaction. We cannot send
197 // ROLLBACK because Drop is synchronous and ROLLBACK is async.
198 //
199 // Detach the connection from the pool permanently via
200 // Object::take(). This prevents the dirty connection from
201 // being handed to the next caller. RecyclingMethod::Fast
202 // does NOT run a health-check query, so without this the
203 // connection would be reused in an aborted-transaction state.
204 //
205 // The returned ClientWrapper drops here, closing the TCP
206 // connection. The pool slot is freed and a fresh connection
207 // will be created on the next acquire().
208 let _ = deadpool_postgres::Object::take(conn.inner);
209 #[cfg(debug_assertions)]
210 eprintln!(
211 "bsql: transaction dropped without commit() or rollback() \
212 — connection discarded from pool"
213 );
214 }
215 }
216 }
217}