Skip to main content

bsql_core/
transaction.rs

1//! Database transactions with commit/rollback, drop-guard, and lazy-BEGIN.
2//!
3//! Created via [`Pool::begin()`](crate::pool::Pool::begin). A transaction
4//! holds a single connection from the pool for its entire lifetime. Queries
5//! executed through the `Executor` trait run within the transaction.
6//!
7//! # Lazy BEGIN
8//!
9//! `Pool::begin()` acquires a connection but does NOT send `BEGIN` to
10//! PostgreSQL. The `BEGIN` is sent lazily on the first query inside the
11//! transaction (via `ensure_begun`). This saves one PG round-trip per
12//! transaction, which adds up under high throughput.
13//!
14//! The lazy approach is transparent to callers: PostgreSQL guarantees
15//! read-committed isolation within a transaction regardless of when
16//! `BEGIN` is issued relative to the first statement.
17//!
18//! If a `Transaction` is created and then committed or dropped without
19//! executing any queries, no `BEGIN`/`COMMIT`/`ROLLBACK` is sent at all.
20//! The connection returns to the pool cleanly.
21//!
22//! # Drop behavior
23//!
24//! If a `Transaction` is dropped without calling [`commit()`](Transaction::commit)
25//! or [`rollback()`](Transaction::rollback):
26//!
27//! - **If `BEGIN` was never sent** (no queries executed): the connection is
28//!   clean and returns to the pool normally.
29//! - **If `BEGIN` was sent**: the connection is dirty. It is permanently
30//!   detached from the pool via `Object::take()` and closed. `Drop` is
31//!   synchronous and cannot send an async `ROLLBACK`, so the connection
32//!   must be discarded to prevent reuse in an aborted-transaction state.
33//!
34//! Always call `commit()` or `rollback()` explicitly.
35
36use std::fmt;
37use std::sync::atomic::{AtomicBool, Ordering};
38
39use crate::error::{BsqlError, BsqlResult};
40use crate::pool::PoolConnection;
41
42/// A database transaction.
43///
44/// Created by [`Pool::begin()`](crate::pool::Pool::begin). Must be explicitly
45/// committed via [`commit()`](Transaction::commit). If dropped without
46/// `commit()`, the connection is discarded from the pool (unless no queries
47/// were executed, in which case `BEGIN` was never sent and the connection
48/// is clean).
49pub struct Transaction {
50    /// `None` after `commit()` or `rollback()` consumes the connection.
51    /// Since both methods take `self`, user code cannot observe `None` —
52    /// this is only `None` during `Drop` after a successful commit.
53    conn: Option<PoolConnection>,
54    committed: bool,
55    /// Whether `BEGIN` has been sent to PostgreSQL. `AtomicBool` provides
56    /// interior mutability for `&self` methods (the Executor trait takes
57    /// `&self`) while keeping Transaction `Send + Sync`.
58    ///
59    /// Relaxed ordering suffices: a Transaction is never shared between
60    /// tasks (PG connections are not multiplexed), so the only observer
61    /// of this flag is the single task that owns the transaction. The
62    /// atomic is used solely for interior mutability through `&self`.
63    begun: AtomicBool,
64}
65
66impl Transaction {
67    /// Create a new transaction. Called by `Pool::begin()`.
68    ///
69    /// Does NOT send `BEGIN` — that is deferred to the first query
70    /// via [`ensure_begun`](Transaction::ensure_begun).
71    pub(crate) fn new(conn: PoolConnection) -> Self {
72        Self {
73            conn: Some(conn),
74            committed: false,
75            begun: AtomicBool::new(false),
76        }
77    }
78
79    /// Send `BEGIN` to PostgreSQL if not already sent.
80    ///
81    /// Called at the start of every `Executor` method. The first call
82    /// sends `BEGIN`; subsequent calls are a no-op (relaxed atomic load).
83    pub(crate) async fn ensure_begun(&self) -> BsqlResult<()> {
84        if !self.begun.load(Ordering::Relaxed) {
85            self.conn
86                .as_ref()
87                .expect("bsql bug: Transaction used after commit/rollback")
88                .inner
89                .batch_execute("BEGIN")
90                .await
91                .map_err(BsqlError::from)?;
92            self.begun.store(true, Ordering::Relaxed);
93        }
94        Ok(())
95    }
96
97    /// Commit the transaction and return the connection to the pool.
98    ///
99    /// Consumes `self` — the transaction cannot be used after commit.
100    ///
101    /// If no queries were executed (`BEGIN` was never sent), this is a
102    /// no-op: no `COMMIT` is sent and the connection returns cleanly.
103    pub async fn commit(mut self) -> BsqlResult<()> {
104        if !self.begun.load(Ordering::Relaxed) {
105            // BEGIN was never sent — nothing to commit.
106            // Connection is clean; let it return to the pool via Drop.
107            self.committed = true;
108            return Ok(());
109        }
110
111        let conn = self
112            .conn
113            .as_ref()
114            .expect("bsql bug: Transaction::commit called but connection already taken");
115        match conn.inner.batch_execute("COMMIT").await {
116            Ok(()) => {
117                self.committed = true;
118                // conn drops with self, returning to pool (clean after COMMIT)
119                Ok(())
120            }
121            Err(e) => {
122                // COMMIT failed — connection is dirty (aborted transaction).
123                // Detach it from the pool so nobody else gets it.
124                if let Some(conn) = self.conn.take() {
125                    let _ = deadpool_postgres::Object::take(conn.inner);
126                }
127                self.committed = true; // suppress Drop warning — we handled it
128                Err(BsqlError::from(e))
129            }
130        }
131    }
132
133    /// Explicitly roll back the transaction and return the connection to the pool.
134    ///
135    /// Consumes `self` — the transaction cannot be used after rollback.
136    ///
137    /// If no queries were executed (`BEGIN` was never sent), this is a
138    /// no-op: no `ROLLBACK` is sent and the connection returns cleanly.
139    pub async fn rollback(mut self) -> BsqlResult<()> {
140        if !self.begun.load(Ordering::Relaxed) {
141            // BEGIN was never sent — nothing to roll back.
142            // Connection is clean; let it return to the pool via Drop.
143            self.committed = true;
144            return Ok(());
145        }
146
147        let conn = self
148            .conn
149            .as_ref()
150            .expect("bsql bug: Transaction::rollback called but connection already taken");
151        match conn.inner.batch_execute("ROLLBACK").await {
152            Ok(()) => {
153                self.committed = true; // suppress Drop warning — rollback is intentional
154                // conn drops with self, returning to pool (clean after ROLLBACK)
155                Ok(())
156            }
157            Err(e) => {
158                // ROLLBACK failed — connection is broken. Detach from pool.
159                if let Some(conn) = self.conn.take() {
160                    let _ = deadpool_postgres::Object::take(conn.inner);
161                }
162                self.committed = true;
163                Err(BsqlError::from(e))
164            }
165        }
166    }
167
168    /// Access the inner connection for `Executor` implementation.
169    pub(crate) fn connection(&self) -> &PoolConnection {
170        self.conn
171            .as_ref()
172            .expect("bsql bug: Transaction used after commit/rollback")
173    }
174}
175
176impl fmt::Debug for Transaction {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("Transaction")
179            .field("active", &self.conn.is_some())
180            .field("committed", &self.committed)
181            .field("begun", &self.begun.load(Ordering::Relaxed))
182            .finish()
183    }
184}
185
186impl Drop for Transaction {
187    fn drop(&mut self) {
188        if !self.committed {
189            if !self.begun.load(Ordering::Relaxed) {
190                // BEGIN was never sent — connection is clean.
191                // Let it return to the pool normally (conn drops with self).
192                return;
193            }
194
195            if let Some(conn) = self.conn.take() {
196                // Connection has an uncommitted transaction. We cannot send
197                // ROLLBACK because Drop is synchronous and ROLLBACK is async.
198                //
199                // Detach the connection from the pool permanently via
200                // Object::take(). This prevents the dirty connection from
201                // being handed to the next caller. RecyclingMethod::Fast
202                // does NOT run a health-check query, so without this the
203                // connection would be reused in an aborted-transaction state.
204                //
205                // The returned ClientWrapper drops here, closing the TCP
206                // connection. The pool slot is freed and a fresh connection
207                // will be created on the next acquire().
208                let _ = deadpool_postgres::Object::take(conn.inner);
209                #[cfg(debug_assertions)]
210                eprintln!(
211                    "bsql: transaction dropped without commit() or rollback() \
212                     — connection discarded from pool"
213                );
214            }
215        }
216    }
217}