Skip to main content

mempill_postgres/
txn.rs

1//! `PostgresTxn` — the concrete transaction handle wrapping a pooled Postgres connection.
2//!
3//! # Design (own the pooled connection, manual BEGIN/COMMIT/ROLLBACK)
4//!
5//! `postgres::Client::transaction()` returns `Transaction<'_>` that borrows `&mut Client`.
6//! This conflicts with `Txn: Send + 'static`. Resolution (identical to `SqliteTxn`): own the
7//! pooled connection outright and issue `BEGIN`/`COMMIT`/`ROLLBACK` via `batch_execute`.
8//!
9//! `PooledConnection<PostgresConnectionManager<NoTls>>` is `Send` (r2d2 guarantees).
10//! `PostgresTxn` is therefore `Send + 'static` without any `unsafe`.
11//!
12//! # Per-agent_id advisory lock
13//!
14//! After `BEGIN`, the first statement is:
15//! ```sql
16//! SELECT pg_advisory_xact_lock(hashtext($1)::bigint)
17//! ```
18//! This serializes same-agent_id writes at the DB level. The lock is transaction-scoped:
19//! auto-released on COMMIT or ROLLBACK (no leak risk on panic).
20//!
21//! # `as_deref_mut` verification
22//!
23//! `r2d2_postgres::PooledConnection<M>` implements `DerefMut<Target = postgres::Client>`.
24//! `Option<PooledConnection<M>>::as_deref_mut()` yields `Option<&mut postgres::Client>`.
25//! This is used in `client()` and confirmed to compile under r2d2_postgres 0.18.
26
27use r2d2::PooledConnection;
28use r2d2_postgres::PostgresConnectionManager;
29use postgres::NoTls;
30use mempill_core::ports::persistence::Txn;
31use mempill_types::identity::AgentId;
32
33use crate::connection::PostgresStoreError;
34
35/// An open, uncommitted Postgres transaction scoped to one `agent_id`.
36///
37/// Created by `PostgresPersistenceStore::begin_atomic`; consumed by `commit` or `rollback`.
38/// Owns the pooled connection for the duration of the transaction.
39/// The connection returns to the r2d2 pool on `Drop`.
40pub struct PostgresTxn {
41    agent_id: AgentId,
42    /// Pooled connection with an open transaction.
43    /// `Option` so we can move it out on commit/rollback without destructuring.
44    /// Connection returns to pool when `PooledConnection` is dropped.
45    conn: Option<PooledConnection<PostgresConnectionManager<NoTls>>>,
46}
47
48// PooledConnection<PostgresConnectionManager<NoTls>>: Send (r2d2 guarantees).
49// PostgresTxn therefore: Send + 'static. No unsafe needed.
50
51impl PostgresTxn {
52    /// Begin a new transaction. Called exclusively from `PostgresPersistenceStore::begin_atomic`.
53    ///
54    /// Issues `BEGIN` then acquires the per-agent_id advisory lock.
55    pub(crate) fn begin(
56        agent_id: AgentId,
57        mut conn: PooledConnection<PostgresConnectionManager<NoTls>>,
58    ) -> Result<Self, PostgresStoreError> {
59        conn.batch_execute("BEGIN")?;
60        conn.execute(
61            "SELECT pg_advisory_xact_lock(hashtext($1)::bigint)",
62            &[&agent_id.0.as_str()],
63        )?;
64        Ok(Self { agent_id, conn: Some(conn) })
65    }
66
67    /// Borrow the inner `postgres::Client` for SQL execution.
68    ///
69    /// `Option<PooledConnection<M>>::as_deref_mut()` yields `Option<&mut postgres::Client>`
70    /// via `DerefMut` on `PooledConnection` (r2d2_postgres 0.18 implements `DerefMut<Target = postgres::Client>`).
71    /// Confirmed to compile with r2d2_postgres 0.18.
72    pub(crate) fn client(&mut self) -> &mut postgres::Client {
73        self.conn
74            .as_deref_mut()
75            .expect("PostgresTxn: connection consumed — cannot call client() after commit/rollback")
76    }
77
78    /// COMMIT the transaction. The pooled connection returns to the r2d2 pool on drop.
79    pub(crate) fn commit_and_drop(mut self) -> Result<(), PostgresStoreError> {
80        let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
81        conn.batch_execute("COMMIT")?;
82        // conn drops here → returns to pool via r2d2 PooledConnection Drop impl
83        Ok(())
84    }
85
86    /// ROLLBACK the transaction. The pooled connection returns to the r2d2 pool on drop.
87    pub(crate) fn rollback_and_drop(mut self) -> Result<(), PostgresStoreError> {
88        let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
89        conn.batch_execute("ROLLBACK")?;
90        // conn drops here → returns to pool
91        Ok(())
92    }
93}
94
95impl Drop for PostgresTxn {
96    /// Best-effort ROLLBACK on panic or drop without explicit commit (append-only invariant).
97    fn drop(&mut self) {
98        if let Some(ref mut conn) = self.conn {
99            // Best-effort; ignore error on drop — the open transaction will be
100            // rolled back by Postgres when the connection is returned to the pool anyway.
101            let _ = conn.batch_execute("ROLLBACK");
102            // conn returned to pool after this block via PooledConnection Drop
103        }
104    }
105}
106
107impl Txn for PostgresTxn {
108    fn agent_id(&self) -> &AgentId {
109        &self.agent_id
110    }
111}