claw-core 0.1.1

Embedded local database engine for ClawDB — an agent-native cognitive database
Documentation
//! Transaction wrapper for claw-core.
//!
//! [`ClawTransaction`] provides a thin ergonomic wrapper around a SQLx
//! [`sqlx::Transaction`], ensuring that claw-core operations participate
//! in ACID transactions with explicit commit/rollback semantics.
//!
//! Cache operations staged during a transaction are accessible via
//! [`ClawTransaction::pending_cache_ops`] so callers can apply them to an
//! in-memory cache after a successful [`ClawTransaction::commit`].

use serde::{Deserialize, Serialize};
use sqlx::{Sqlite, Transaction};
use uuid::Uuid;

use crate::engine::ClawEngine;
use crate::error::{ClawError, ClawResult};
use crate::store::memory::MemoryRecord;

// ── CacheRecord ──────────────────────────────────────────────────────────────

/// A key-value cache record that can be staged for deferred cache operations.
///
/// `CacheRecord` mirrors the fields relevant to the in-memory cache layer:
/// a string `key`, an arbitrary JSON `value`, and the `agent_id` that owns
/// the record.
///
/// # Example
///
/// ```rust
/// use claw_core::CacheRecord;
/// let r = CacheRecord {
///     agent_id: "agent-1".to_string(),
///     key: "my-key".to_string(),
///     value: serde_json::json!({"hello": "world"}),
/// };
/// assert_eq!(r.key, "my-key");
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheRecord {
    /// The agent that owns this record.
    pub agent_id: String,
    /// Logical key within the agent's scratchpad.
    pub key: String,
    /// Serialized JSON value.
    pub value: serde_json::Value,
}

// ── CacheOp ──────────────────────────────────────────────────────────────────

/// A deferred cache mutation that should be applied after a successful commit.
///
/// Variants are staged via [`ClawTransaction::stage`] and retrieved with
/// [`ClawTransaction::pending_cache_ops`].
///
/// # Example
///
/// ```rust
/// use claw_core::{CacheOp, CacheRecord};
/// use uuid::Uuid;
/// let op = CacheOp::Delete(Uuid::new_v4());
/// ```
#[derive(Debug, Clone)]
pub enum CacheOp {
    /// Insert or update the record identified by the given [`Uuid`].
    Insert(Uuid, CacheRecord),
    /// Remove the record identified by the given [`Uuid`] from the cache.
    Delete(Uuid),
    /// Invalidate the entire cache.
    Clear,
}

// ── ClawTransaction ───────────────────────────────────────────────────────────

// ── StagedOp ─────────────────────────────────────────────────────────────────

/// A database write operation staged in [`ClawTransaction`] for deferred
/// two-phase commit.
///
/// Operations are not written to the database until [`ClawTransaction::commit`]
/// is called, ensuring that `memories` and `memories_fts` are always updated
/// together inside a single SQLite transaction.
#[derive(Debug, Clone)]
pub enum StagedOp {
    /// Insert a new [`MemoryRecord`] into `memories`, `memories_fts`, and
    /// `memory_tags`.
    InsertMemory(MemoryRecord),
}

// ── ClawTransaction ───────────────────────────────────────────────────────────

/// A wrapper around a SQLite transaction with deferred cache operations.
///
/// Obtain a [`ClawTransaction`] via [`ClawTransaction::begin`] or
/// [`ClawEngine::transaction`]. Call [`ClawTransaction::commit`] to
/// persist changes or allow the value to be dropped to trigger an implicit
/// rollback.
///
/// Cache mutations that should be applied after a successful commit can be
/// registered with [`ClawTransaction::stage`] and retrieved afterwards with
/// [`ClawTransaction::pending_cache_ops`].
///
/// # Example
///
/// ```rust,no_run
/// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
/// # async fn example() -> claw_core::ClawResult<()> {
/// # let engine = ClawEngine::open_default().await?;
/// let mut tx = engine.transaction().await?;
/// let r = MemoryRecord::new("hello", MemoryType::Semantic, vec![], None);
/// tx.insert_memory(&r).await?;
/// tx.commit().await?;
/// # Ok(())
/// # }
/// ```
pub struct ClawTransaction<'c> {
    inner: Transaction<'c, Sqlite>,
    /// Pending database write operations applied atomically on commit.
    staged: Vec<StagedOp>,
    cache_ops: Vec<CacheOp>,
}

impl<'c> ClawTransaction<'c> {
    /// Wrap a raw SQLx transaction.
    pub(crate) fn new(inner: Transaction<'c, Sqlite>) -> Self {
        ClawTransaction {
            inner,
            staged: Vec::new(),
            cache_ops: Vec::new(),
        }
    }

    /// Begin a new transaction against the engine's connection pool.
    ///
    /// # Errors
    ///
    /// Returns [`ClawError::Transaction`] if the underlying pool cannot start
    /// a new transaction.
    pub async fn begin(engine: &'c ClawEngine) -> ClawResult<ClawTransaction<'c>> {
        let tx = engine
            .pool
            .begin()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))?;
        Ok(ClawTransaction::new(tx))
    }

    /// Stage a [`CacheOp`] to be applied to an in-memory cache after a
    /// successful commit.
    ///
    /// Staged operations are available via [`ClawTransaction::pending_cache_ops`].
    pub fn stage(&mut self, op: CacheOp) {
        self.cache_ops.push(op);
    }

    /// Return a slice of the cache operations staged so far.
    pub fn pending_cache_ops(&self) -> &[CacheOp] {
        &self.cache_ops
    }

    /// Commit the transaction, making all changes permanent.
    ///
    /// All operations staged via [`ClawTransaction::insert_memory`] are
    /// written to the database — including their FTS5 and tag-index rows —
    /// inside the open transaction before the final commit, guaranteeing
    /// atomicity.
    ///
    /// # Errors
    ///
    /// Returns [`ClawError::Transaction`] if the commit fails.
    pub async fn commit(mut self) -> ClawResult<()> {
        // Phase 2: flush all staged operations into the open transaction.
        for op in std::mem::take(&mut self.staged) {
            match op {
                StagedOp::InsertMemory(record) => {
                    let tags =
                        serde_json::to_string(&record.tags).map_err(ClawError::Serialization)?;
                    sqlx::query(
                        "INSERT INTO memories \
                         (id, content, memory_type, tags, ttl_seconds, \
                          created_at, updated_at) \
                         VALUES (?, ?, ?, ?, ?, ?, ?)",
                    )
                    .bind(record.id.to_string())
                    .bind(&record.content)
                    .bind(record.memory_type.as_str())
                    .bind(&tags)
                    .bind(record.ttl_seconds.map(|s| s as i64))
                    .bind(record.created_at.to_rfc3339())
                    .bind(record.updated_at.to_rfc3339())
                    .execute(&mut *self.inner)
                    .await?;

                    // FTS5 row inserted atomically in the same transaction.
                    sqlx::query("INSERT INTO memories_fts(id, content) VALUES (?, ?)")
                        .bind(record.id.to_string())
                        .bind(&record.content)
                        .execute(&mut *self.inner)
                        .await?;

                    // Normalised tag index rows.
                    for tag in &record.tags {
                        sqlx::query(
                            "INSERT OR IGNORE INTO memory_tags(memory_id, tag) \
                             VALUES (?, ?)",
                        )
                        .bind(record.id.to_string())
                        .bind(tag)
                        .execute(&mut *self.inner)
                        .await?;
                    }
                }
            }
        }

        self.inner
            .commit()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))
    }

    /// Explicitly roll back the transaction, discarding all changes.
    ///
    /// All operations in the staging buffer are discarded without any database
    /// writes.  Any writes made directly via [`ClawTransaction::inner_mut`]
    /// are rolled back by the underlying SQLite transaction.
    ///
    /// # Errors
    ///
    /// Returns [`ClawError::Transaction`] if the rollback fails.
    pub async fn rollback(mut self) -> ClawResult<()> {
        // Clear staged ops — nothing has been written to the DB for these yet.
        self.staged.clear();
        self.inner
            .rollback()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))
    }

    /// Return a mutable reference to the inner SQLx transaction for use with
    /// raw SQLx queries.
    pub fn inner_mut(&mut self) -> &mut Transaction<'c, Sqlite> {
        &mut self.inner
    }

    /// Insert a [`crate::store::memory::MemoryRecord`] within this transaction.
    ///
    /// Both the `memories` table and the `memories_fts` FTS5 index are updated
    /// atomically as part of the same transaction.
    ///
    /// # Errors
    ///
    /// Returns a [`ClawError`] if the SQL execution fails or serialization fails.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// # use claw_core::{ClawEngine, MemoryRecord, MemoryType};
    /// # async fn example() -> claw_core::ClawResult<()> {
    /// # let engine = ClawEngine::open_default().await?;
    /// let mut tx = engine.transaction().await?;
    /// let r = MemoryRecord::new("transactional", MemoryType::Episodic, vec![], None);
    /// let id = tx.insert_memory(&r).await?;
    /// tx.commit().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn insert_memory(&mut self, record: &MemoryRecord) -> ClawResult<Uuid> {
        // Phase 1: add to the staging buffer — no DB writes yet.
        self.staged.push(StagedOp::InsertMemory(record.clone()));
        Ok(record.id)
    }
}

impl<'c> std::fmt::Debug for ClawTransaction<'c> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ClawTransaction").finish_non_exhaustive()
    }
}