Skip to main content

roboticus_db/
lib.rs

1//! # roboticus-db
2//!
3//! SQLite persistence layer for the Roboticus agent runtime. All state --
4//! sessions, memories, tool calls, policy decisions, cron jobs, embeddings,
5//! skills, and semantic cache -- lives in a single SQLite database with WAL
6//! mode enabled.
7//!
8//! ## Key Types
9//!
10//! - [`Database`] -- Thread-safe handle wrapping `Arc<Mutex<Connection>>`
11//!
12//! ## Modules
13//!
14//! - `schema` -- Table definitions, `initialize_db()`, migration runner
15//! - `sessions` -- Session CRUD, message append/list, turn persistence
16//! - `memory` -- 5-tier memory CRUD (working, episodic, semantic, procedural, relationship) + FTS5
17//! - `embeddings` -- BLOB embedding storage / lookup with JSON fallback
18//! - `ann` -- HNSW approximate nearest-neighbor index (instant-distance)
19//! - `hippocampus` -- Long-term memory consolidation and decay
20//! - `learned_skills` -- Learned skill CRUD, reinforcement (success/failure), priority
21//! - `checkpoint` -- Session checkpoint / restore via `context_snapshots` table
22//! - `efficiency` -- Efficiency metrics tracking and queries
23//! - `agents` -- Sub-agent registry and enabled-agent CRUD
24//! - `backend` -- Storage backend abstraction trait
25//! - `cache` -- Semantic cache persistence (loaded on boot, flushed every 5 min)
26//! - `cron` -- Cron job state, lease acquisition, run history
27//! - `skills` -- Skill definition CRUD and trigger lookup
28//! - `tools` -- Tool call records
29//! - `policy` -- Policy decision records
30//! - `metrics` -- Inference cost tracking, proxy snapshots, transactions, turn feedback
31//! - `routing_dataset` -- Historical routing decision + cost outcome JOIN for ML training
32//! - `shadow_routing` -- Counterfactual ML predictions stored alongside production decisions
33//! - `revenue_introspection` -- Unified introspection surface: strategy health, profitability, audit trail
34//! - `traces` -- Pipeline trace persistence (`pipeline_traces` table) + flight-recorder ReAct trace JSON
35
36pub mod abuse;
37pub mod agents;
38pub mod ann;
39pub mod approvals;
40pub mod backend;
41pub mod cache;
42pub mod checkpoint;
43pub mod cron;
44pub mod delegation;
45pub mod delivery_queue;
46pub mod efficiency;
47pub mod embeddings;
48mod ext;
49pub use ext::*;
50pub mod hippocampus;
51pub mod hygiene_log;
52pub mod learned_skills;
53pub mod memory;
54pub mod memory_index;
55pub mod metrics;
56pub mod model_selection;
57pub mod policy;
58pub mod revenue_accounting;
59pub mod revenue_feedback;
60pub mod revenue_introspection;
61pub mod revenue_opportunity_queries;
62pub mod revenue_scoring;
63pub mod revenue_strategy_summary;
64pub mod revenue_swap_tasks;
65pub mod revenue_tax_tasks;
66pub mod routing_dataset;
67pub mod schema;
68pub mod service_revenue;
69pub mod sessions;
70pub mod shadow_routing;
71pub mod skills;
72pub mod task_events;
73pub mod tasks;
74pub mod tool_embeddings;
75pub mod tools;
76pub mod traces;
77
78pub use rusqlite::params_from_iter;
79
80use roboticus_core::Result;
81
82/// Connection pool size. SQLite WAL mode supports concurrent readers,
83/// so multiple connections can read in parallel without contention.
84/// Writes are serialized by SQLite itself (not by our code).
85const POOL_SIZE: u32 = 8;
86
87/// Connection pool timeout. If all connections are in use, callers
88/// block for up to this duration before returning an error.
89const POOL_TIMEOUT_SECS: u64 = 5;
90
91/// A pooled SQLite connection. Returned by `Database::conn()`.
92/// The connection is returned to the pool when this guard is dropped.
93pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
94
95#[derive(Clone)]
96pub struct Database {
97    pool: r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>,
98}
99
100impl Database {
101    /// Opens a new database at the given path (or in-memory if `":memory:"`).
102    ///
103    /// Creates a connection pool with `POOL_SIZE` connections. Each connection
104    /// is initialized with WAL mode, foreign keys, and synchronous=NORMAL.
105    /// SQLite WAL mode allows concurrent readers while serializing writers.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use roboticus_db::Database;
111    ///
112    /// let db = Database::new(":memory:").unwrap();
113    /// // database is now ready for use
114    /// ```
115    pub fn new(path: &str) -> Result<Self> {
116        let is_memory = path == ":memory:";
117        let manager = if is_memory {
118            r2d2_sqlite::SqliteConnectionManager::memory()
119        } else {
120            r2d2_sqlite::SqliteConnectionManager::file(path)
121        };
122
123        // Configure each connection in the pool with WAL mode and pragmas.
124        // WAL is only meaningful for file-based DBs; harmless for in-memory.
125        let manager = manager.with_init(|conn| {
126            conn.execute_batch(
127                "PRAGMA journal_mode=WAL; \
128                 PRAGMA foreign_keys=ON; \
129                 PRAGMA synchronous=NORMAL; \
130                 PRAGMA auto_vacuum=INCREMENTAL;",
131            )?;
132            Ok(())
133        });
134
135        // In-memory databases: pool size 1 because each in-memory connection
136        // is a separate isolated database. For file-based DBs, use the full
137        // pool — SQLite WAL supports concurrent readers.
138        let size = if is_memory { 1 } else { POOL_SIZE };
139
140        let pool = r2d2::Pool::builder()
141            .max_size(size)
142            .connection_timeout(std::time::Duration::from_secs(POOL_TIMEOUT_SECS))
143            .build(manager)
144            .map_err(|e| {
145                roboticus_core::RoboticusError::Database(format!(
146                    "failed to create connection pool: {e}"
147                ))
148            })?;
149
150        // Run one-time setup on a single connection: auto_vacuum upgrade.
151        {
152            let conn = pool.get().map_err(|e| {
153                roboticus_core::RoboticusError::Database(format!("pool get failed: {e}"))
154            })?;
155            let current_auto_vacuum: i64 = conn
156                .query_row("PRAGMA auto_vacuum", [], |row| row.get(0))
157                .unwrap_or(0);
158            if current_auto_vacuum == 0 {
159                let _ = conn.execute_batch("PRAGMA auto_vacuum=INCREMENTAL; VACUUM;");
160            }
161        }
162
163        let db = Self { pool };
164        schema::initialize_db(&db)?;
165        Ok(db)
166    }
167
168    /// Gets a connection from the pool. Never blocks an async worker thread —
169    /// the pool manages its own concurrency. Multiple readers can proceed in
170    /// parallel; SQLite WAL serializes writers internally.
171    ///
172    /// Panics if the pool is exhausted and the timeout expires. In practice
173    /// this should never happen with POOL_SIZE=8 and short-lived connections.
174    pub fn conn(&self) -> PooledConnection {
175        self.pool.get().unwrap_or_else(|e| {
176            // This is a critical error — the pool is exhausted. Log and panic
177            // rather than returning an error, to match the previous Mutex API.
178            panic!("database connection pool exhausted: {e}");
179        })
180    }
181}
182
183impl std::fmt::Debug for Database {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("Database").finish()
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn database_debug_impl() {
195        let db = Database::new(":memory:").expect("in-memory db");
196        let s = format!("{:?}", db);
197        assert_eq!(s, "Database");
198    }
199
200    #[test]
201    fn database_new_in_memory() {
202        let db = Database::new(":memory:").expect("in-memory db");
203        let _guard = db.conn();
204    }
205
206    #[test]
207    fn database_new_invalid_path_returns_error() {
208        let result = Database::new("/");
209        assert!(result.is_err(), "opening \"/\" as database should fail");
210    }
211}