mempill_sqlite/txn.rs
1//! `SqliteTxn` — the concrete transaction handle wrapping a rusqlite connection.
2//!
3//! # Design (explicit transaction control)
4//!
5//! rusqlite's `Transaction<'conn>` is lifetime-bound to `&mut Connection`, which conflicts
6//! with the `Txn: Send + 'static` bound required by the port trait (§4) and `spawn_blocking`.
7//!
8//! Resolution: `SqliteTxn` owns the `Connection` outright (moved out of the `Arc<Mutex<…>>`
9//! in `begin_atomic`). The `SqlitePersistenceStore` uses an `Option<Arc<Mutex<Connection>>>`
10//! internally; `begin_atomic` takes the connection out for the duration of the txn and returns
11//! it on `commit`/`rollback`. Because `Connection: Send` (rusqlite guarantees this), the
12//! owned `SqliteTxn` is `Send + 'static`.
13//!
14//! A simpler, more robust alternative that avoids unsafe code: use a boxed `Connection`
15//! with an explicit `BEGIN`/`COMMIT`/`ROLLBACK` sequence rather than rusqlite's
16//! `Transaction` type. This is the approach taken here.
17//!
18//! # Single-writer per agent_id
19//!
20//! v0.1 is single-process embedded. The `AgentWriteLockMap` in mempill-core coordinates
21//! writes per agent_id at the async boundary. The store itself assumes a single writer per
22//! connection file; no additional locking is needed inside `SqliteTxn`.
23
24use mempill_core::ports::persistence::Txn;
25use mempill_types::identity::AgentId;
26use rusqlite::Connection;
27
28use crate::SqliteStoreError;
29
30/// An open, uncommitted SQLite transaction scoped to one `agent_id`.
31///
32/// Created by `SqlitePersistenceStore::begin_atomic`; consumed by `commit` or `rollback`.
33/// Owns the `Connection` for the lifetime of the transaction — the store re-acquires it
34/// after `commit` or `rollback` completes.
35pub struct SqliteTxn {
36 agent_id: AgentId,
37 /// The connection with an open `BEGIN DEFERRED` transaction.
38 /// `Option` so we can move it out on commit/rollback without destructuring.
39 conn: Option<Box<Connection>>,
40}
41
42// rusqlite::Connection is Send; SqliteTxn owns it exclusively.
43// SAFETY guaranteed by the type system: Box<Connection>: Send.
44unsafe impl Send for SqliteTxn {}
45
46impl SqliteTxn {
47 /// Begin a new transaction. Called exclusively from `SqlitePersistenceStore::begin_atomic`.
48 pub(crate) fn begin(
49 agent_id: AgentId,
50 conn: Box<Connection>,
51 ) -> Result<Self, SqliteStoreError> {
52 conn.execute_batch("BEGIN DEFERRED")?;
53 Ok(Self { agent_id, conn: Some(conn) })
54 }
55
56 /// Borrow the inner connection to execute SQL (INSERT, etc.).
57 pub(crate) fn conn(&self) -> &Connection {
58 self.conn.as_ref().expect("SqliteTxn: connection must be present (not yet consumed)")
59 }
60
61 /// COMMIT the transaction and return the owned connection to the caller.
62 pub(crate) fn commit_and_return(mut self) -> Result<Box<Connection>, SqliteStoreError> {
63 let conn = self.conn.take().expect("SqliteTxn: connection must be present");
64 conn.execute_batch("COMMIT")?;
65 Ok(conn)
66 }
67
68 /// ROLLBACK the transaction and return the owned connection to the caller.
69 pub(crate) fn rollback_and_return(mut self) -> Result<Box<Connection>, SqliteStoreError> {
70 let conn = self.conn.take().expect("SqliteTxn: connection must be present");
71 conn.execute_batch("ROLLBACK")?;
72 Ok(conn)
73 }
74}
75
76impl Drop for SqliteTxn {
77 /// If `SqliteTxn` is dropped without an explicit commit or rollback (e.g. on panic),
78 /// the `Box<Connection>` is dropped here. SQLite automatically rolls back any open
79 /// transaction when the connection is closed — the append-only invariant is preserved.
80 fn drop(&mut self) {
81 if let Some(ref conn) = self.conn {
82 // Best-effort ROLLBACK; ignore error on drop.
83 let _ = conn.execute_batch("ROLLBACK");
84 }
85 }
86}
87
88impl Txn for SqliteTxn {
89 fn agent_id(&self) -> &AgentId {
90 &self.agent_id
91 }
92}