roboticus-db 0.11.4

SQLite persistence layer with 28 tables, FTS5 search, WAL mode, and migration system
Documentation
//! # roboticus-db
//!
//! SQLite persistence layer for the Roboticus agent runtime. All state --
//! sessions, memories, tool calls, policy decisions, cron jobs, embeddings,
//! skills, and semantic cache -- lives in a single SQLite database with WAL
//! mode enabled.
//!
//! ## Key Types
//!
//! - [`Database`] -- Thread-safe handle wrapping `Arc<Mutex<Connection>>`
//!
//! ## Modules
//!
//! - `schema` -- Table definitions, `initialize_db()`, migration runner
//! - `sessions` -- Session CRUD, message append/list, turn persistence
//! - `memory` -- 5-tier memory CRUD (working, episodic, semantic, procedural, relationship) + FTS5
//! - `embeddings` -- BLOB embedding storage / lookup with JSON fallback
//! - `ann` -- HNSW approximate nearest-neighbor index (instant-distance)
//! - `hippocampus` -- Long-term memory consolidation and decay
//! - `learned_skills` -- Learned skill CRUD, reinforcement (success/failure), priority
//! - `checkpoint` -- Session checkpoint / restore via `context_snapshots` table
//! - `efficiency` -- Efficiency metrics tracking and queries
//! - `agents` -- Sub-agent registry and enabled-agent CRUD
//! - `backend` -- Storage backend abstraction trait
//! - `cache` -- Semantic cache persistence (loaded on boot, flushed every 5 min)
//! - `cron` -- Cron job state, lease acquisition, run history
//! - `skills` -- Skill definition CRUD and trigger lookup
//! - `tools` -- Tool call records
//! - `policy` -- Policy decision records
//! - `metrics` -- Inference cost tracking, proxy snapshots, transactions, turn feedback
//! - `routing_dataset` -- Historical routing decision + cost outcome JOIN for ML training
//! - `shadow_routing` -- Counterfactual ML predictions stored alongside production decisions
//! - `revenue_introspection` -- Unified introspection surface: strategy health, profitability, audit trail
//! - `traces` -- Pipeline trace persistence (`pipeline_traces` table) + flight-recorder ReAct trace JSON

pub mod abuse;
pub mod agents;
pub mod ann;
pub mod approvals;
pub mod backend;
pub mod cache;
pub mod checkpoint;
pub mod cron;
pub mod delegation;
pub mod delivery_queue;
pub mod efficiency;
pub mod embeddings;
mod ext;
pub use ext::*;
pub mod hippocampus;
pub mod hygiene_log;
pub mod learned_skills;
pub mod memory;
pub mod memory_index;
pub mod metrics;
pub mod model_selection;
pub mod policy;
pub mod revenue_accounting;
pub mod revenue_feedback;
pub mod revenue_introspection;
pub mod revenue_opportunity_queries;
pub mod revenue_scoring;
pub mod revenue_strategy_summary;
pub mod revenue_swap_tasks;
pub mod revenue_tax_tasks;
pub mod routing_dataset;
pub mod schema;
pub mod service_revenue;
pub mod sessions;
pub mod shadow_routing;
pub mod skills;
pub mod task_events;
pub mod tasks;
pub mod tool_embeddings;
pub mod tools;
pub mod traces;
pub mod treasury;

pub use rusqlite::params_from_iter;

use roboticus_core::Result;

/// Connection pool size. SQLite WAL mode supports concurrent readers,
/// so multiple connections can read in parallel without contention.
/// Writes are serialized by SQLite itself (not by our code).
const POOL_SIZE: u32 = 8;

/// Connection pool timeout. If all connections are in use, callers
/// block for up to this duration before returning an error.
const POOL_TIMEOUT_SECS: u64 = 5;

/// A pooled SQLite connection. Returned by `Database::conn()`.
/// The connection is returned to the pool when this guard is dropped.
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;

#[derive(Clone)]
pub struct Database {
    pool: r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>,
}

impl Database {
    /// Opens a new database at the given path (or in-memory if `":memory:"`).
    ///
    /// Creates a connection pool with `POOL_SIZE` connections. Each connection
    /// is initialized with WAL mode, foreign keys, and synchronous=NORMAL.
    /// SQLite WAL mode allows concurrent readers while serializing writers.
    ///
    /// # Examples
    ///
    /// ```
    /// use roboticus_db::Database;
    ///
    /// let db = Database::new(":memory:").unwrap();
    /// // database is now ready for use
    /// ```
    pub fn new(path: &str) -> Result<Self> {
        let is_memory = path == ":memory:";
        let manager = if is_memory {
            r2d2_sqlite::SqliteConnectionManager::memory()
        } else {
            r2d2_sqlite::SqliteConnectionManager::file(path)
        };

        // Configure each connection in the pool with WAL mode and pragmas.
        // WAL is only meaningful for file-based DBs; harmless for in-memory.
        let manager = manager.with_init(|conn| {
            conn.execute_batch(
                "PRAGMA journal_mode=WAL; \
                 PRAGMA foreign_keys=ON; \
                 PRAGMA synchronous=NORMAL; \
                 PRAGMA auto_vacuum=INCREMENTAL;",
            )?;
            Ok(())
        });

        // In-memory databases: pool size 1 because each in-memory connection
        // is a separate isolated database. For file-based DBs, use the full
        // pool — SQLite WAL supports concurrent readers.
        let size = if is_memory { 1 } else { POOL_SIZE };

        let pool = r2d2::Pool::builder()
            .max_size(size)
            .connection_timeout(std::time::Duration::from_secs(POOL_TIMEOUT_SECS))
            .build(manager)
            .map_err(|e| {
                roboticus_core::RoboticusError::Database(format!(
                    "failed to create connection pool: {e}"
                ))
            })?;

        // Run one-time setup on a single connection: auto_vacuum upgrade.
        {
            let conn = pool.get().map_err(|e| {
                roboticus_core::RoboticusError::Database(format!("pool get failed: {e}"))
            })?;
            let current_auto_vacuum: i64 = conn
                .query_row("PRAGMA auto_vacuum", [], |row| row.get(0))
                .unwrap_or(0);
            if current_auto_vacuum == 0 {
                let _ = conn.execute_batch("PRAGMA auto_vacuum=INCREMENTAL; VACUUM;");
            }
        }

        let db = Self { pool };
        schema::initialize_db(&db)?;
        Ok(db)
    }

    /// Gets a connection from the pool. Never blocks an async worker thread —
    /// the pool manages its own concurrency. Multiple readers can proceed in
    /// parallel; SQLite WAL serializes writers internally.
    ///
    /// Panics if the pool is exhausted and the timeout expires. In practice
    /// this should never happen with POOL_SIZE=8 and short-lived connections.
    pub fn conn(&self) -> PooledConnection {
        self.pool.get().unwrap_or_else(|e| {
            // This is a critical error — the pool is exhausted. Log and panic
            // rather than returning an error, to match the previous Mutex API.
            panic!("database connection pool exhausted: {e}");
        })
    }
}

impl std::fmt::Debug for Database {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Database").finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn database_debug_impl() {
        let db = Database::new(":memory:").expect("in-memory db");
        let s = format!("{:?}", db);
        assert_eq!(s, "Database");
    }

    #[test]
    fn database_new_in_memory() {
        let db = Database::new(":memory:").expect("in-memory db");
        let _guard = db.conn();
    }

    #[test]
    fn database_new_invalid_path_returns_error() {
        let result = Database::new("/");
        assert!(result.is_err(), "opening \"/\" as database should fail");
    }
}