mempill_postgres/txn.rs
1//! `PostgresTxn` — the concrete transaction handle wrapping a pooled Postgres connection.
2//!
3//! # Design (own the pooled connection, manual BEGIN/COMMIT/ROLLBACK)
4//!
5//! `postgres::Client::transaction()` returns `Transaction<'_>` that borrows `&mut Client`.
6//! This conflicts with `Txn: Send + 'static`. Resolution (identical to `SqliteTxn`): own the
7//! pooled connection outright and issue `BEGIN`/`COMMIT`/`ROLLBACK` via `batch_execute`.
8//!
9//! `PooledConnection<PostgresConnectionManager<NoTls>>` is `Send` (r2d2 guarantees).
10//! `PostgresTxn` is therefore `Send + 'static` without any `unsafe`.
11//!
12//! # Per-agent_id advisory lock
13//!
14//! After `BEGIN`, the first statement is:
15//! ```sql
16//! SELECT pg_advisory_xact_lock(hashtext($1)::bigint)
17//! ```
18//! This serializes same-agent_id writes at the DB level. The lock is transaction-scoped:
19//! auto-released on COMMIT or ROLLBACK (no leak risk on panic).
20//!
21//! # `as_deref_mut` verification
22//!
23//! `r2d2_postgres::PooledConnection<M>` implements `DerefMut<Target = postgres::Client>`.
24//! `Option<PooledConnection<M>>::as_deref_mut()` yields `Option<&mut postgres::Client>`.
25//! This is used in `client()` and confirmed to compile under r2d2_postgres 0.18.
26
27use r2d2::PooledConnection;
28use r2d2_postgres::PostgresConnectionManager;
29use postgres::NoTls;
30use mempill_core::ports::persistence::Txn;
31use mempill_types::identity::AgentId;
32
33use crate::connection::PostgresStoreError;
34
35/// An open, uncommitted Postgres transaction scoped to one `agent_id`.
36///
37/// Created by `PostgresPersistenceStore::begin_atomic`; consumed by `commit` or `rollback`.
38/// Owns the pooled connection for the duration of the transaction.
39/// The connection returns to the r2d2 pool on `Drop`.
40pub struct PostgresTxn {
41 agent_id: AgentId,
42 /// Pooled connection with an open transaction.
43 /// `Option` so we can move it out on commit/rollback without destructuring.
44 /// Connection returns to pool when `PooledConnection` is dropped.
45 conn: Option<PooledConnection<PostgresConnectionManager<NoTls>>>,
46}
47
48// PooledConnection<PostgresConnectionManager<NoTls>>: Send (r2d2 guarantees).
49// PostgresTxn therefore: Send + 'static. No unsafe needed.
50
51impl PostgresTxn {
52 /// Begin a new transaction. Called exclusively from `PostgresPersistenceStore::begin_atomic`.
53 ///
54 /// Issues `BEGIN` then acquires the per-agent_id advisory lock.
55 pub(crate) fn begin(
56 agent_id: AgentId,
57 mut conn: PooledConnection<PostgresConnectionManager<NoTls>>,
58 ) -> Result<Self, PostgresStoreError> {
59 conn.batch_execute("BEGIN")?;
60 conn.execute(
61 "SELECT pg_advisory_xact_lock(hashtext($1)::bigint)",
62 &[&agent_id.0.as_str()],
63 )?;
64 Ok(Self { agent_id, conn: Some(conn) })
65 }
66
67 /// Borrow the inner `postgres::Client` for SQL execution.
68 ///
69 /// `Option<PooledConnection<M>>::as_deref_mut()` yields `Option<&mut postgres::Client>`
70 /// via `DerefMut` on `PooledConnection` (r2d2_postgres 0.18 implements `DerefMut<Target = postgres::Client>`).
71 /// Confirmed to compile with r2d2_postgres 0.18.
72 pub(crate) fn client(&mut self) -> &mut postgres::Client {
73 self.conn
74 .as_deref_mut()
75 .expect("PostgresTxn: connection consumed — cannot call client() after commit/rollback")
76 }
77
78 /// COMMIT the transaction. The pooled connection returns to the r2d2 pool on drop.
79 pub(crate) fn commit_and_drop(mut self) -> Result<(), PostgresStoreError> {
80 let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
81 conn.batch_execute("COMMIT")?;
82 // conn drops here → returns to pool via r2d2 PooledConnection Drop impl
83 Ok(())
84 }
85
86 /// ROLLBACK the transaction. The pooled connection returns to the r2d2 pool on drop.
87 pub(crate) fn rollback_and_drop(mut self) -> Result<(), PostgresStoreError> {
88 let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
89 conn.batch_execute("ROLLBACK")?;
90 // conn drops here → returns to pool
91 Ok(())
92 }
93}
94
95impl Drop for PostgresTxn {
96 /// Best-effort ROLLBACK on panic or drop without explicit commit (append-only invariant).
97 fn drop(&mut self) {
98 if let Some(ref mut conn) = self.conn {
99 // Best-effort; ignore error on drop — the open transaction will be
100 // rolled back by Postgres when the connection is returned to the pool anyway.
101 let _ = conn.batch_execute("ROLLBACK");
102 // conn returned to pool after this block via PooledConnection Drop
103 }
104 }
105}
106
107impl Txn for PostgresTxn {
108 fn agent_id(&self) -> &AgentId {
109 &self.agent_id
110 }
111}