claw-core 0.1.2

Embedded local database engine for ClawDB — an agent-native cognitive database
Documentation
//! Transaction wrapper for claw-core.

use std::sync::Arc;

use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::{Sqlite, Transaction};
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::cache::ClawCache;
use crate::engine::ClawEngine;
use crate::error::{ClawError, ClawResult};
use crate::store::memory::MemoryRecord;

/// A key-value cache record that can be staged for deferred cache operations.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheRecord {
    /// The agent that owns this record.
    pub agent_id: String,
    /// Logical key within the agent's scratchpad.
    pub key: String,
    /// Serialized JSON value.
    pub value: serde_json::Value,
}

/// A deferred cache mutation that should be applied after a successful commit.
#[derive(Debug, Clone)]
pub enum CacheOp {
    /// Insert or update the record identified by the given [`Uuid`].
    Insert(Uuid, CacheRecord),
    /// Remove the record identified by the given [`Uuid`] from the cache.
    Delete(Uuid),
    /// Invalidate the entire cache.
    Clear,
}

/// A database write operation staged in [`ClawTransaction`].
#[derive(Debug, Clone)]
pub enum StagedMemoryOp {
    /// Insert a new [`MemoryRecord`].
    Insert(MemoryRecord),
    /// Update memory content by id.
    Update {
        /// Target memory id.
        id: Uuid,
        /// New content.
        content: String,
    },
    /// Delete memory by id.
    Delete {
        /// Target memory id.
        id: Uuid,
    },
}

/// A wrapper around a SQLite transaction with staged memory operations.
pub struct ClawTransaction<'c> {
    inner: Transaction<'c, Sqlite>,
    /// Pending memory operations applied atomically during commit.
    staged: Vec<StagedMemoryOp>,
    cache_ops: Vec<CacheOp>,
    cache: Arc<Mutex<ClawCache<Uuid, MemoryRecord>>>,
}

impl<'c> ClawTransaction<'c> {
    /// Wrap a raw SQLx transaction.
    pub(crate) fn new(
        inner: Transaction<'c, Sqlite>,
        cache: Arc<Mutex<ClawCache<Uuid, MemoryRecord>>>,
    ) -> Self {
        ClawTransaction {
            inner,
            staged: Vec::new(),
            cache_ops: Vec::new(),
            cache,
        }
    }

    /// Begin a new transaction against the engine's connection pool.
    pub async fn begin(engine: &'c ClawEngine) -> ClawResult<ClawTransaction<'c>> {
        let tx = engine
            .pool
            .begin()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))?;
        Ok(ClawTransaction::new(tx, Arc::clone(&engine.cache)))
    }

    /// Stage a [`CacheOp`] to be applied after successful commit.
    pub fn stage(&mut self, op: CacheOp) {
        self.cache_ops.push(op);
    }

    /// Return a slice of staged cache operations.
    pub fn pending_cache_ops(&self) -> &[CacheOp] {
        &self.cache_ops
    }

    /// Stage insertion of a [`MemoryRecord`].
    pub async fn insert_memory(&mut self, record: &MemoryRecord) -> ClawResult<Uuid> {
        self.staged.push(StagedMemoryOp::Insert(record.clone()));
        Ok(record.id)
    }

    /// Stage update of memory content.
    pub fn update_memory(&mut self, id: Uuid, content: impl Into<String>) {
        self.staged.push(StagedMemoryOp::Update {
            id,
            content: content.into(),
        });
    }

    /// Stage deletion of a memory row.
    pub fn delete_memory(&mut self, id: Uuid) {
        self.staged.push(StagedMemoryOp::Delete { id });
    }

    /// Commit all staged operations atomically.
    pub async fn commit(mut self) -> ClawResult<()> {
        let mut committed_records = Vec::new();

        for op in std::mem::take(&mut self.staged) {
            match op {
                StagedMemoryOp::Insert(record) => {
                    let tags =
                        serde_json::to_string(&record.tags).map_err(ClawError::Serialization)?;
                    sqlx::query(
                        "INSERT INTO memories \
                         (id, content, memory_type, tags, ttl_seconds, created_at, updated_at) \
                         VALUES (?, ?, ?, ?, ?, ?, ?)",
                    )
                    .bind(record.id.to_string())
                    .bind(&record.content)
                    .bind(record.memory_type.as_str())
                    .bind(&tags)
                    .bind(record.ttl_seconds.map(|s| s as i64))
                    .bind(record.created_at.to_rfc3339())
                    .bind(record.updated_at.to_rfc3339())
                    .execute(&mut *self.inner)
                    .await?;

                    sqlx::query(
                        "INSERT INTO memories_fts(rowid, content) \
                        VALUES (last_insert_rowid(), ?)",
                    )
                    .bind(&record.content)
                    .execute(&mut *self.inner)
                    .await?;

                    for tag in &record.tags {
                        sqlx::query(
                            "INSERT OR REPLACE INTO memory_tags(memory_id, tag) VALUES (?, ?)",
                        )
                        .bind(record.id.to_string())
                        .bind(tag)
                        .execute(&mut *self.inner)
                        .await?;
                    }

                    committed_records.push(record);
                }
                StagedMemoryOp::Update { id, content } => {
                    let updated_at = Utc::now().to_rfc3339();
                    let affected =
                        sqlx::query("UPDATE memories SET content = ?, updated_at = ? WHERE id = ?")
                            .bind(&content)
                            .bind(updated_at)
                            .bind(id.to_string())
                            .execute(&mut *self.inner)
                            .await?
                            .rows_affected();

                    if affected == 0 {
                        return Err(ClawError::NotFound {
                            entity: "MemoryRecord".to_string(),
                            id: id.to_string(),
                        });
                    }

                    sqlx::query(
                        "DELETE FROM memories_fts WHERE rowid = \
                         (SELECT rowid FROM memories WHERE id = ?)",
                    )
                    .bind(id.to_string())
                    .execute(&mut *self.inner)
                    .await?;

                    sqlx::query(
                        "INSERT INTO memories_fts(rowid, content) \
                         VALUES ((SELECT rowid FROM memories WHERE id = ?), ?)",
                    )
                    .bind(id.to_string())
                    .bind(content)
                    .execute(&mut *self.inner)
                    .await?;
                }
                StagedMemoryOp::Delete { id } => {
                    sqlx::query(
                        "DELETE FROM memories_fts WHERE rowid = \
                         (SELECT rowid FROM memories WHERE id = ?)",
                    )
                    .bind(id.to_string())
                    .execute(&mut *self.inner)
                    .await?;

                    let affected = sqlx::query("DELETE FROM memories WHERE id = ?")
                        .bind(id.to_string())
                        .execute(&mut *self.inner)
                        .await?
                        .rows_affected();

                    if affected == 0 {
                        return Err(ClawError::NotFound {
                            entity: "MemoryRecord".to_string(),
                            id: id.to_string(),
                        });
                    }
                }
            }
        }

        self.inner
            .commit()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))?;

        if !committed_records.is_empty() {
            let mut cache = self.cache.lock().await;
            for record in committed_records {
                cache.insert(record.id, record);
            }
        }

        Ok(())
    }

    /// Explicitly roll back the transaction, discarding all staged changes.
    pub async fn rollback(mut self) -> ClawResult<()> {
        self.staged.clear();
        self.inner
            .rollback()
            .await
            .map_err(|e| ClawError::Transaction(e.to_string()))
    }

    /// Return a mutable reference to the inner SQLx transaction.
    pub fn inner_mut(&mut self) -> &mut Transaction<'c, Sqlite> {
        &mut self.inner
    }
}

impl<'c> std::fmt::Debug for ClawTransaction<'c> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ClawTransaction").finish_non_exhaustive()
    }
}