kiromi-ai-memory 0.2.2

Local-first multi-tenant memory store engine: Markdown/text content on object storage, metadata in SQLite, plugin-shaped embedder/storage/metadata, hybrid text+vector search.
Documentation
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! Plan 18 step 14 — operational APIs for SQLite-backed stores.
//!
//! Surfaces SQLite primitives (`VACUUM`, `incremental_vacuum`, `ANALYZE`,
//! `wal_checkpoint(TRUNCATE)`, `VACUUM INTO`) and audit-log retention through
//! the public [`Memory`] handle. Each call routes through the configured
//! [`crate::metadata::MetadataStore`]; non-SQLite back-ends short-circuit with
//! [`Error::Config`].
//!
//! These APIs are additive — no existing public surface changes.

use std::path::Path;
use std::time::{Duration, Instant};

use serde::{Deserialize, Serialize};
use sqlx::Row;

use crate::error::{Error, Result};
use crate::handle::Memory;

/// Knobs for [`Memory::vacuum`].
#[non_exhaustive]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct VacuumOpts {
    /// When set, run `PRAGMA incremental_vacuum(<max_pages>)` instead of a
    /// full `VACUUM`. Useful for live stores where holding the write lock for
    /// a full rebuild is unacceptable.
    pub max_pages: Option<u32>,
    /// When `true`, also run `INSERT INTO memory_fts(memory_fts) VALUES('optimize')`
    /// (and the same on `summary_fts` if present) to compact the FTS5 segments.
    /// No-op when the tables don't exist yet.
    pub include_fts: bool,
}

/// Outcome of [`Memory::vacuum`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VacuumReport {
    /// Pages reclaimed (best-effort — derived from `freelist_count` before/after).
    pub pages_reclaimed: u64,
    /// Bytes freed (`pages_reclaimed * page_size`, best-effort).
    pub bytes_freed: u64,
    /// Wall-clock duration in milliseconds.
    pub duration_ms: u64,
}

/// Outcome of [`Memory::backup_to`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupReport {
    /// Total bytes copied to the destination.
    pub bytes_copied: u64,
    /// Wall-clock duration in milliseconds.
    pub duration_ms: u64,
}

/// Outcome of [`Memory::checkpoint`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointReport {
    /// Number of WAL pages written back to the main DB.
    pub pages_written: u64,
    /// Number of WAL frames truncated.
    pub frames_truncated: u32,
}

impl Memory {
    fn pool(&self) -> Result<&sqlx::SqlitePool> {
        self.inner.metadata.sqlite_pool().ok_or_else(|| {
            Error::Config("operational APIs require a SQLite metadata back-end".into())
        })
    }

    /// Compact the underlying SQLite database.
    ///
    /// With `opts.max_pages = None` this runs the blocking `VACUUM` statement,
    /// which rebuilds the entire DB and reclaims every freelist page; with
    /// `Some(n)` it runs `PRAGMA incremental_vacuum(n)` which only frees up
    /// to `n` pages and is safe to run on a live store. Non-SQLite back-ends
    /// return [`Error::Config`].
    pub async fn vacuum(&self, opts: VacuumOpts) -> Result<VacuumReport> {
        let pool = self.pool()?;
        let started = Instant::now();

        let page_size: u64 = sqlx::query_scalar::<_, i64>("PRAGMA page_size")
            .fetch_one(pool)
            .await
            .map_err(|e| Error::metadata("ops.vacuum.page_size", e))?
            .max(0) as u64;
        let pages_before: u64 = sqlx::query_scalar::<_, i64>("PRAGMA freelist_count")
            .fetch_one(pool)
            .await
            .map_err(|e| Error::metadata("ops.vacuum.freelist_before", e))?
            .max(0) as u64;

        match opts.max_pages {
            None => {
                sqlx::query("VACUUM")
                    .execute(pool)
                    .await
                    .map_err(|e| Error::metadata("ops.vacuum.full", e))?;
            }
            Some(n) => {
                let stmt = format!("PRAGMA incremental_vacuum({n})");
                sqlx::query(&stmt)
                    .execute(pool)
                    .await
                    .map_err(|e| Error::metadata("ops.vacuum.incremental", e))?;
            }
        }

        if opts.include_fts {
            // Plan 18: `create_indices_if_missing` is called from
            // `Builder::open` on every fresh handle once `embedder_dims`
            // is known, so the FTS5 tables exist for any store that has
            // either an embedder bound or one prior append. The early-life
            // caller-provided + no-append-yet case is the only window where
            // they may not exist; tolerate just that. Every other sqlx
            // error (I/O, corruption) propagates.
            for stmt in [
                "INSERT INTO memory_fts(memory_fts) VALUES('optimize')",
                "INSERT INTO summary_fts(summary_fts) VALUES('optimize')",
            ] {
                if let Err(e) = sqlx::query(stmt).execute(pool).await {
                    let msg = e.to_string();
                    let is_missing_table = match &e {
                        sqlx::Error::Database(db) => db
                            .message()
                            .to_ascii_lowercase()
                            .starts_with("no such table"),
                        _ => false,
                    } || msg.to_ascii_lowercase().contains("no such table");
                    if !is_missing_table {
                        return Err(Error::metadata("ops.vacuum.fts_optimize", e));
                    }
                }
            }
        }

        let pages_after: u64 = sqlx::query_scalar::<_, i64>("PRAGMA freelist_count")
            .fetch_one(pool)
            .await
            .map_err(|e| Error::metadata("ops.vacuum.freelist_after", e))?
            .max(0) as u64;
        let pages_reclaimed = pages_before.saturating_sub(pages_after);

        Ok(VacuumReport {
            pages_reclaimed,
            bytes_freed: pages_reclaimed.saturating_mul(page_size),
            duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
        })
    }

    /// Run `ANALYZE` so the SQLite query planner has fresh statistics.
    pub async fn analyze(&self) -> Result<()> {
        let pool = self.pool()?;
        sqlx::query("ANALYZE")
            .execute(pool)
            .await
            .map_err(|e| Error::metadata("ops.analyze", e))?;
        Ok(())
    }

    /// Copy the live database to `path` using `VACUUM INTO`. The destination
    /// MUST NOT exist; SQLite errors otherwise. Holds a read lock on the
    /// source for the copy duration but does not block readers.
    pub async fn backup_to(&self, path: &Path) -> Result<BackupReport> {
        let pool = self.pool()?;
        let started = Instant::now();

        // Quote per SQLite literal rules.
        let path_str = path.to_string_lossy().replace('\'', "''");
        let stmt = format!("VACUUM INTO '{path_str}'");
        sqlx::query(&stmt)
            .execute(pool)
            .await
            .map_err(|e| Error::metadata("ops.backup_to", e))?;

        let bytes_copied = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
        Ok(BackupReport {
            bytes_copied,
            duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
        })
    }

    /// Run `PRAGMA wal_checkpoint(TRUNCATE)` to flush the WAL into the main
    /// DB and shrink the WAL file to zero. Returns the number of pages
    /// written and frames truncated.
    pub async fn checkpoint(&self) -> Result<CheckpointReport> {
        let pool = self.pool()?;
        // wal_checkpoint returns three integers: busy, log, checkpointed.
        let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
            .fetch_one(pool)
            .await
            .map_err(|e| Error::metadata("ops.checkpoint", e))?;
        // Column 1 is "log" (frames in WAL), column 2 is "checkpointed" (frames written).
        let log: i64 = row.try_get(1).unwrap_or(0);
        let checkpointed: i64 = row.try_get(2).unwrap_or(0);
        Ok(CheckpointReport {
            pages_written: u64::try_from(checkpointed).unwrap_or(0),
            frames_truncated: u32::try_from(log).unwrap_or(0),
        })
    }

    /// Delete audit-log entries older than `retain`. Returns the number of
    /// rows removed.
    pub async fn compact_audit_log(&self, retain: Duration) -> Result<u64> {
        let pool = self.pool()?;
        let now_ms = self.inner.clock.now_ms();
        let cutoff = now_ms.saturating_sub(i64::try_from(retain.as_millis()).unwrap_or(i64::MAX));
        let res = sqlx::query("DELETE FROM audit_log WHERE ts < ?")
            .bind(cutoff)
            .execute(pool)
            .await
            .map_err(|e| Error::metadata("ops.compact_audit_log", e))?;
        Ok(res.rows_affected())
    }
}