claw_core/transaction.rs
1//! Transaction wrapper for claw-core.
2//!
3//! [`ClawTransaction`] provides a thin ergonomic wrapper around a SQLx
4//! [`sqlx::Transaction`], ensuring that claw-core operations participate
5//! in ACID transactions with explicit commit/rollback semantics.
6//!
7//! Cache operations staged during a transaction are accessible via
8//! [`ClawTransaction::pending_cache_ops`] so callers can apply them to an
9//! in-memory cache after a successful [`ClawTransaction::commit`].
10
11use serde::{Deserialize, Serialize};
12use sqlx::{Sqlite, Transaction};
13use uuid::Uuid;
14
15use crate::engine::ClawEngine;
16use crate::error::{ClawError, ClawResult};
17use crate::store::memory::MemoryRecord;
18
19// ── CacheRecord ──────────────────────────────────────────────────────────────
20
21/// A key-value cache record that can be staged for deferred cache operations.
22///
23/// `CacheRecord` mirrors the fields relevant to the in-memory cache layer:
24/// a string `key`, an arbitrary JSON `value`, and the `agent_id` that owns
25/// the record.
26///
27/// # Example
28///
29/// ```rust
30/// use claw_core::CacheRecord;
31/// let r = CacheRecord {
32/// agent_id: "agent-1".to_string(),
33/// key: "my-key".to_string(),
34/// value: serde_json::json!({"hello": "world"}),
35/// };
36/// assert_eq!(r.key, "my-key");
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CacheRecord {
40 /// The agent that owns this record.
41 pub agent_id: String,
42 /// Logical key within the agent's scratchpad.
43 pub key: String,
44 /// Serialized JSON value.
45 pub value: serde_json::Value,
46}
47
48// ── CacheOp ──────────────────────────────────────────────────────────────────
49
50/// A deferred cache mutation that should be applied after a successful commit.
51///
52/// Variants are staged via [`ClawTransaction::stage`] and retrieved with
53/// [`ClawTransaction::pending_cache_ops`].
54///
55/// # Example
56///
57/// ```rust
58/// use claw_core::{CacheOp, CacheRecord};
59/// use uuid::Uuid;
60/// let op = CacheOp::Delete(Uuid::new_v4());
61/// ```
62#[derive(Debug, Clone)]
63pub enum CacheOp {
64 /// Insert or update the record identified by the given [`Uuid`].
65 Insert(Uuid, CacheRecord),
66 /// Remove the record identified by the given [`Uuid`] from the cache.
67 Delete(Uuid),
68 /// Invalidate the entire cache.
69 Clear,
70}
71
72// ── ClawTransaction ───────────────────────────────────────────────────────────
73
74// ── StagedOp ─────────────────────────────────────────────────────────────────
75
76/// A database write operation staged in [`ClawTransaction`] for deferred
77/// two-phase commit.
78///
79/// Operations are not written to the database until [`ClawTransaction::commit`]
80/// is called, ensuring that `memories` and `memories_fts` are always updated
81/// together inside a single SQLite transaction.
82#[derive(Debug, Clone)]
83pub enum StagedOp {
84 /// Insert a new [`MemoryRecord`] into `memories`, `memories_fts`, and
85 /// `memory_tags`.
86 InsertMemory(MemoryRecord),
87}
88
89// ── ClawTransaction ───────────────────────────────────────────────────────────
90
91/// A wrapper around a SQLite transaction with deferred cache operations.
92///
93/// Obtain a [`ClawTransaction`] via [`ClawTransaction::begin`] or
94/// [`ClawEngine::transaction`]. Call [`ClawTransaction::commit`] to
95/// persist changes or allow the value to be dropped to trigger an implicit
96/// rollback.
97///
98/// Cache mutations that should be applied after a successful commit can be
99/// registered with [`ClawTransaction::stage`] and retrieved afterwards with
100/// [`ClawTransaction::pending_cache_ops`].
101///
102/// # Example
103///
104/// ```rust,no_run
105/// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
106/// # async fn example() -> claw_core::ClawResult<()> {
107/// # let engine = ClawEngine::open_default().await?;
108/// let mut tx = engine.transaction().await?;
109/// let r = MemoryRecord::new("hello", MemoryType::Semantic, vec![], None);
110/// tx.insert_memory(&r).await?;
111/// tx.commit().await?;
112/// # Ok(())
113/// # }
114/// ```
115pub struct ClawTransaction<'c> {
116 inner: Transaction<'c, Sqlite>,
117 /// Pending database write operations applied atomically on commit.
118 staged: Vec<StagedOp>,
119 cache_ops: Vec<CacheOp>,
120}
121
122impl<'c> ClawTransaction<'c> {
123 /// Wrap a raw SQLx transaction.
124 pub(crate) fn new(inner: Transaction<'c, Sqlite>) -> Self {
125 ClawTransaction {
126 inner,
127 staged: Vec::new(),
128 cache_ops: Vec::new(),
129 }
130 }
131
132 /// Begin a new transaction against the engine's connection pool.
133 ///
134 /// # Errors
135 ///
136 /// Returns [`ClawError::Transaction`] if the underlying pool cannot start
137 /// a new transaction.
138 pub async fn begin(engine: &'c ClawEngine) -> ClawResult<ClawTransaction<'c>> {
139 let tx = engine
140 .pool
141 .begin()
142 .await
143 .map_err(|e| ClawError::Transaction(e.to_string()))?;
144 Ok(ClawTransaction::new(tx))
145 }
146
147 /// Stage a [`CacheOp`] to be applied to an in-memory cache after a
148 /// successful commit.
149 ///
150 /// Staged operations are available via [`ClawTransaction::pending_cache_ops`].
151 pub fn stage(&mut self, op: CacheOp) {
152 self.cache_ops.push(op);
153 }
154
155 /// Return a slice of the cache operations staged so far.
156 pub fn pending_cache_ops(&self) -> &[CacheOp] {
157 &self.cache_ops
158 }
159
160 /// Commit the transaction, making all changes permanent.
161 ///
162 /// All operations staged via [`ClawTransaction::insert_memory`] are
163 /// written to the database — including their FTS5 and tag-index rows —
164 /// inside the open transaction before the final commit, guaranteeing
165 /// atomicity.
166 ///
167 /// # Errors
168 ///
169 /// Returns [`ClawError::Transaction`] if the commit fails.
170 pub async fn commit(mut self) -> ClawResult<()> {
171 // Phase 2: flush all staged operations into the open transaction.
172 for op in std::mem::take(&mut self.staged) {
173 match op {
174 StagedOp::InsertMemory(record) => {
175 let tags =
176 serde_json::to_string(&record.tags).map_err(ClawError::Serialization)?;
177 sqlx::query(
178 "INSERT INTO memories \
179 (id, content, memory_type, tags, ttl_seconds, \
180 created_at, updated_at) \
181 VALUES (?, ?, ?, ?, ?, ?, ?)",
182 )
183 .bind(record.id.to_string())
184 .bind(&record.content)
185 .bind(record.memory_type.as_str())
186 .bind(&tags)
187 .bind(record.ttl_seconds.map(|s| s as i64))
188 .bind(record.created_at.to_rfc3339())
189 .bind(record.updated_at.to_rfc3339())
190 .execute(&mut *self.inner)
191 .await?;
192
193 // FTS5 row inserted atomically in the same transaction.
194 sqlx::query("INSERT INTO memories_fts(id, content) VALUES (?, ?)")
195 .bind(record.id.to_string())
196 .bind(&record.content)
197 .execute(&mut *self.inner)
198 .await?;
199
200 // Normalised tag index rows.
201 for tag in &record.tags {
202 sqlx::query(
203 "INSERT OR IGNORE INTO memory_tags(memory_id, tag) \
204 VALUES (?, ?)",
205 )
206 .bind(record.id.to_string())
207 .bind(tag)
208 .execute(&mut *self.inner)
209 .await?;
210 }
211 }
212 }
213 }
214
215 self.inner
216 .commit()
217 .await
218 .map_err(|e| ClawError::Transaction(e.to_string()))
219 }
220
221 /// Explicitly roll back the transaction, discarding all changes.
222 ///
223 /// All operations in the staging buffer are discarded without any database
224 /// writes. Any writes made directly via [`ClawTransaction::inner_mut`]
225 /// are rolled back by the underlying SQLite transaction.
226 ///
227 /// # Errors
228 ///
229 /// Returns [`ClawError::Transaction`] if the rollback fails.
230 pub async fn rollback(mut self) -> ClawResult<()> {
231 // Clear staged ops — nothing has been written to the DB for these yet.
232 self.staged.clear();
233 self.inner
234 .rollback()
235 .await
236 .map_err(|e| ClawError::Transaction(e.to_string()))
237 }
238
239 /// Return a mutable reference to the inner SQLx transaction for use with
240 /// raw SQLx queries.
241 pub fn inner_mut(&mut self) -> &mut Transaction<'c, Sqlite> {
242 &mut self.inner
243 }
244
245 /// Insert a [`crate::store::memory::MemoryRecord`] within this transaction.
246 ///
247 /// Both the `memories` table and the `memories_fts` FTS5 index are updated
248 /// atomically as part of the same transaction.
249 ///
250 /// # Errors
251 ///
252 /// Returns a [`ClawError`] if the SQL execution fails or serialization fails.
253 ///
254 /// # Example
255 ///
256 /// ```rust,no_run
257 /// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
258 /// # async fn example() -> claw_core::ClawResult<()> {
259 /// # let engine = ClawEngine::open_default().await?;
260 /// let mut tx = engine.transaction().await?;
261 /// let r = MemoryRecord::new("transactional", MemoryType::Episodic, vec![], None);
262 /// let id = tx.insert_memory(&r).await?;
263 /// tx.commit().await?;
264 /// # Ok(())
265 /// # }
266 /// ```
267 pub async fn insert_memory(&mut self, record: &MemoryRecord) -> ClawResult<Uuid> {
268 // Phase 1: add to the staging buffer — no DB writes yet.
269 self.staged.push(StagedOp::InsertMemory(record.clone()));
270 Ok(record.id)
271 }
272}
273
274impl<'c> std::fmt::Debug for ClawTransaction<'c> {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct("ClawTransaction").finish_non_exhaustive()
277 }
278}