use r2d2::PooledConnection;
use r2d2_postgres::PostgresConnectionManager;
use postgres::NoTls;
use mempill_core::ports::persistence::Txn;
use mempill_types::identity::AgentId;
use crate::connection::PostgresStoreError;
pub struct PostgresTxn {
agent_id: AgentId,
conn: Option<PooledConnection<PostgresConnectionManager<NoTls>>>,
}
impl PostgresTxn {
pub(crate) fn begin(
agent_id: AgentId,
mut conn: PooledConnection<PostgresConnectionManager<NoTls>>,
) -> Result<Self, PostgresStoreError> {
conn.batch_execute("BEGIN")?;
conn.execute(
"SELECT pg_advisory_xact_lock(hashtext($1)::bigint)",
&[&agent_id.0.as_str()],
)?;
Ok(Self { agent_id, conn: Some(conn) })
}
pub(crate) fn client(&mut self) -> &mut postgres::Client {
self.conn
.as_deref_mut()
.expect("PostgresTxn: connection consumed — cannot call client() after commit/rollback")
}
pub(crate) fn commit_and_drop(mut self) -> Result<(), PostgresStoreError> {
let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
conn.batch_execute("COMMIT")?;
Ok(())
}
pub(crate) fn rollback_and_drop(mut self) -> Result<(), PostgresStoreError> {
let mut conn = self.conn.take().expect("PostgresTxn: connection consumed");
conn.batch_execute("ROLLBACK")?;
Ok(())
}
}
impl Drop for PostgresTxn {
fn drop(&mut self) {
if let Some(ref mut conn) = self.conn {
let _ = conn.batch_execute("ROLLBACK");
}
}
}
impl Txn for PostgresTxn {
fn agent_id(&self) -> &AgentId {
&self.agent_id
}
}