Skip to main content

claw_core/
transaction.rs

1//! Transaction wrapper for claw-core.
2//!
3//! [`ClawTransaction`] provides a thin ergonomic wrapper around a SQLx
4//! [`sqlx::Transaction`], ensuring that claw-core operations participate
5//! in ACID transactions with explicit commit/rollback semantics.
6//!
7//! Cache operations staged during a transaction are accessible via
8//! [`ClawTransaction::pending_cache_ops`] so callers can apply them to an
9//! in-memory cache after a successful [`ClawTransaction::commit`].
10
11use serde::{Deserialize, Serialize};
12use sqlx::{Sqlite, Transaction};
13use uuid::Uuid;
14
15use crate::engine::ClawEngine;
16use crate::error::{ClawError, ClawResult};
17use crate::store::memory::MemoryRecord;
18
19// ── CacheRecord ──────────────────────────────────────────────────────────────
20
21/// A key-value cache record that can be staged for deferred cache operations.
22///
23/// `CacheRecord` mirrors the fields relevant to the in-memory cache layer:
24/// a string `key`, an arbitrary JSON `value`, and the `agent_id` that owns
25/// the record.
26///
27/// # Example
28///
29/// ```rust
30/// use claw_core::CacheRecord;
31/// let r = CacheRecord {
32///     agent_id: "agent-1".to_string(),
33///     key: "my-key".to_string(),
34///     value: serde_json::json!({"hello": "world"}),
35/// };
36/// assert_eq!(r.key, "my-key");
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CacheRecord {
40    /// The agent that owns this record.
41    pub agent_id: String,
42    /// Logical key within the agent's scratchpad.
43    pub key: String,
44    /// Serialized JSON value.
45    pub value: serde_json::Value,
46}
47
48// ── CacheOp ──────────────────────────────────────────────────────────────────
49
50/// A deferred cache mutation that should be applied after a successful commit.
51///
52/// Variants are staged via [`ClawTransaction::stage`] and retrieved with
53/// [`ClawTransaction::pending_cache_ops`].
54///
55/// # Example
56///
57/// ```rust
58/// use claw_core::{CacheOp, CacheRecord};
59/// use uuid::Uuid;
60/// let op = CacheOp::Delete(Uuid::new_v4());
61/// ```
62#[derive(Debug, Clone)]
63pub enum CacheOp {
64    /// Insert or update the record identified by the given [`Uuid`].
65    Insert(Uuid, CacheRecord),
66    /// Remove the record identified by the given [`Uuid`] from the cache.
67    Delete(Uuid),
68    /// Invalidate the entire cache.
69    Clear,
70}
71
72// ── ClawTransaction ───────────────────────────────────────────────────────────
73
74// ── StagedOp ─────────────────────────────────────────────────────────────────
75
76/// A database write operation staged in [`ClawTransaction`] for deferred
77/// two-phase commit.
78///
79/// Operations are not written to the database until [`ClawTransaction::commit`]
80/// is called, ensuring that `memories` and `memories_fts` are always updated
81/// together inside a single SQLite transaction.
82#[derive(Debug, Clone)]
83pub enum StagedOp {
84    /// Insert a new [`MemoryRecord`] into `memories`, `memories_fts`, and
85    /// `memory_tags`.
86    InsertMemory(MemoryRecord),
87}
88
89// ── ClawTransaction ───────────────────────────────────────────────────────────
90
91/// A wrapper around a SQLite transaction with deferred cache operations.
92///
93/// Obtain a [`ClawTransaction`] via [`ClawTransaction::begin`] or
94/// [`ClawEngine::transaction`]. Call [`ClawTransaction::commit`] to
95/// persist changes or allow the value to be dropped to trigger an implicit
96/// rollback.
97///
98/// Cache mutations that should be applied after a successful commit can be
99/// registered with [`ClawTransaction::stage`] and retrieved afterwards with
100/// [`ClawTransaction::pending_cache_ops`].
101///
102/// # Example
103///
104/// ```rust,no_run
105/// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
106/// # async fn example() -> claw_core::ClawResult<()> {
107/// # let engine = ClawEngine::open_default().await?;
108/// let mut tx = engine.transaction().await?;
109/// let r = MemoryRecord::new("hello", MemoryType::Semantic, vec![], None);
110/// tx.insert_memory(&r).await?;
111/// tx.commit().await?;
112/// # Ok(())
113/// # }
114/// ```
115pub struct ClawTransaction<'c> {
116    inner: Transaction<'c, Sqlite>,
117    /// Pending database write operations applied atomically on commit.
118    staged: Vec<StagedOp>,
119    cache_ops: Vec<CacheOp>,
120}
121
122impl<'c> ClawTransaction<'c> {
123    /// Wrap a raw SQLx transaction.
124    pub(crate) fn new(inner: Transaction<'c, Sqlite>) -> Self {
125        ClawTransaction {
126            inner,
127            staged: Vec::new(),
128            cache_ops: Vec::new(),
129        }
130    }
131
132    /// Begin a new transaction against the engine's connection pool.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`ClawError::Transaction`] if the underlying pool cannot start
137    /// a new transaction.
138    pub async fn begin(engine: &'c ClawEngine) -> ClawResult<ClawTransaction<'c>> {
139        let tx = engine
140            .pool
141            .begin()
142            .await
143            .map_err(|e| ClawError::Transaction(e.to_string()))?;
144        Ok(ClawTransaction::new(tx))
145    }
146
147    /// Stage a [`CacheOp`] to be applied to an in-memory cache after a
148    /// successful commit.
149    ///
150    /// Staged operations are available via [`ClawTransaction::pending_cache_ops`].
151    pub fn stage(&mut self, op: CacheOp) {
152        self.cache_ops.push(op);
153    }
154
155    /// Return a slice of the cache operations staged so far.
156    pub fn pending_cache_ops(&self) -> &[CacheOp] {
157        &self.cache_ops
158    }
159
160    /// Commit the transaction, making all changes permanent.
161    ///
162    /// All operations staged via [`ClawTransaction::insert_memory`] are
163    /// written to the database — including their FTS5 and tag-index rows —
164    /// inside the open transaction before the final commit, guaranteeing
165    /// atomicity.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`ClawError::Transaction`] if the commit fails.
170    pub async fn commit(mut self) -> ClawResult<()> {
171        // Phase 2: flush all staged operations into the open transaction.
172        for op in std::mem::take(&mut self.staged) {
173            match op {
174                StagedOp::InsertMemory(record) => {
175                    let tags =
176                        serde_json::to_string(&record.tags).map_err(ClawError::Serialization)?;
177                    sqlx::query(
178                        "INSERT INTO memories \
179                         (id, content, memory_type, tags, ttl_seconds, \
180                          created_at, updated_at) \
181                         VALUES (?, ?, ?, ?, ?, ?, ?)",
182                    )
183                    .bind(record.id.to_string())
184                    .bind(&record.content)
185                    .bind(record.memory_type.as_str())
186                    .bind(&tags)
187                    .bind(record.ttl_seconds.map(|s| s as i64))
188                    .bind(record.created_at.to_rfc3339())
189                    .bind(record.updated_at.to_rfc3339())
190                    .execute(&mut *self.inner)
191                    .await?;
192
193                    // FTS5 row inserted atomically in the same transaction.
194                    sqlx::query("INSERT INTO memories_fts(id, content) VALUES (?, ?)")
195                        .bind(record.id.to_string())
196                        .bind(&record.content)
197                        .execute(&mut *self.inner)
198                        .await?;
199
200                    // Normalised tag index rows.
201                    for tag in &record.tags {
202                        sqlx::query(
203                            "INSERT OR IGNORE INTO memory_tags(memory_id, tag) \
204                             VALUES (?, ?)",
205                        )
206                        .bind(record.id.to_string())
207                        .bind(tag)
208                        .execute(&mut *self.inner)
209                        .await?;
210                    }
211                }
212            }
213        }
214
215        self.inner
216            .commit()
217            .await
218            .map_err(|e| ClawError::Transaction(e.to_string()))
219    }
220
221    /// Explicitly roll back the transaction, discarding all changes.
222    ///
223    /// All operations in the staging buffer are discarded without any database
224    /// writes.  Any writes made directly via [`ClawTransaction::inner_mut`]
225    /// are rolled back by the underlying SQLite transaction.
226    ///
227    /// # Errors
228    ///
229    /// Returns [`ClawError::Transaction`] if the rollback fails.
230    pub async fn rollback(mut self) -> ClawResult<()> {
231        // Clear staged ops — nothing has been written to the DB for these yet.
232        self.staged.clear();
233        self.inner
234            .rollback()
235            .await
236            .map_err(|e| ClawError::Transaction(e.to_string()))
237    }
238
239    /// Return a mutable reference to the inner SQLx transaction for use with
240    /// raw SQLx queries.
241    pub fn inner_mut(&mut self) -> &mut Transaction<'c, Sqlite> {
242        &mut self.inner
243    }
244
245    /// Insert a [`crate::store::memory::MemoryRecord`] within this transaction.
246    ///
247    /// Both the `memories` table and the `memories_fts` FTS5 index are updated
248    /// atomically as part of the same transaction.
249    ///
250    /// # Errors
251    ///
252    /// Returns a [`ClawError`] if the SQL execution fails or serialization fails.
253    ///
254    /// # Example
255    ///
256    /// ```rust,no_run
257    /// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
258    /// # async fn example() -> claw_core::ClawResult<()> {
259    /// # let engine = ClawEngine::open_default().await?;
260    /// let mut tx = engine.transaction().await?;
261    /// let r = MemoryRecord::new("transactional", MemoryType::Episodic, vec![], None);
262    /// let id = tx.insert_memory(&r).await?;
263    /// tx.commit().await?;
264    /// # Ok(())
265    /// # }
266    /// ```
267    pub async fn insert_memory(&mut self, record: &MemoryRecord) -> ClawResult<Uuid> {
268        // Phase 1: add to the staging buffer — no DB writes yet.
269        self.staged.push(StagedOp::InsertMemory(record.clone()));
270        Ok(record.id)
271    }
272}
273
274impl<'c> std::fmt::Debug for ClawTransaction<'c> {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct("ClawTransaction").finish_non_exhaustive()
277    }
278}