Skip to main content

innate_core/storage/
mod.rs

1//! SQLite storage layer.
2//!
3//! Replaces sqlite-vec virtual tables with ordinary BLOB columns + pure-Rust
4//! cosine similarity, keeping the schema otherwise aligned with v4.5.x.
5
6use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{dot_product, l2_normalize, unpack_embedding};
15
16mod chunks;
17mod evolution;
18mod meta;
19mod raw;
20mod traces;
21
22const EXPECTED_SCHEMA_VERSION: &str = "4.17";
23
24// Embedded SQL schema — no external files needed.
25const SCHEMA_SQL: &str = include_str!("../schema.sql");
26
27type VectorEntries = Vec<(String, Vec<f32>)>;
28type VectorCache = RefCell<Option<VectorEntries>>;
29
30/// A single dependency edge: `(dst, kind, dst_lib)`.
31pub type DepEdge = (String, String, Option<String>);
32
33pub struct Storage {
34    pub db_path: PathBuf,
35    conn: Connection,
36    pub content_dim: usize,
37    pub trigger_dim: usize,
38    /// Pre-parsed in-memory caches for vector search; None = cold (not loaded or invalidated).
39    vec_content_cache: VectorCache,
40    vec_trigger_cache: VectorCache,
41    /// Last observed vector revision. Only vector writes advance this value.
42    vector_cache_revision: Cell<Option<i64>>,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct EvolveRequestClaim {
47    pub id: String,
48    pub reason: String,
49}
50
51impl Storage {
52    pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
53        let db_path = db_path.as_ref().to_path_buf();
54        if let Some(parent) = db_path.parent() {
55            std::fs::create_dir_all(parent)?;
56        }
57        let conn = Connection::open(&db_path)?;
58        configure_pragmas(&conn)?;
59        let mut s = Self {
60            db_path,
61            conn,
62            content_dim,
63            trigger_dim,
64            vec_content_cache: RefCell::new(None),
65            vec_trigger_cache: RefCell::new(None),
66            vector_cache_revision: Cell::new(None),
67        };
68        s.init_schema()?;
69        Ok(s)
70    }
71
72    pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
73        let db_path = db_path.as_ref().to_path_buf();
74        let conn = Connection::open_with_flags(
75            &db_path,
76            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
77        )?;
78        conn.pragma_update(None, "query_only", "ON")?;
79        conn.pragma_update(None, "foreign_keys", "ON")?;
80        let s = Self {
81            db_path,
82            conn,
83            content_dim: 1024,
84            trigger_dim: 256,
85            vec_content_cache: RefCell::new(None),
86            vec_trigger_cache: RefCell::new(None),
87            vector_cache_revision: Cell::new(None),
88        };
89        Ok(s)
90    }
91
92    fn init_schema(&mut self) -> Result<()> {
93        let has_meta: bool = self.conn.query_row(
94            "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
95            [],
96            |r| r.get::<_, i64>(0),
97        )? > 0;
98
99        if !has_meta {
100            // Wrap schema creation in a transaction for atomicity.
101            self.conn.execute_batch("BEGIN IMMEDIATE")?;
102            let r = self.conn.execute_batch(SCHEMA_SQL);
103            if r.is_ok() {
104                self.conn.execute_batch("COMMIT")?;
105            } else {
106                let _ = self.conn.execute_batch("ROLLBACK");
107                r?;
108            }
109            return Ok(());
110        }
111
112        let current: Option<String> = self
113            .conn
114            .query_row(
115                "SELECT value FROM meta WHERE key='schema_version'",
116                [],
117                |r| r.get(0),
118            )
119            .optional()?;
120
121        let current = current
122            .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
123
124        let cur = ver_tuple(&current);
125        let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
126
127        match cur.cmp(&exp) {
128            std::cmp::Ordering::Equal => Ok(()),
129            std::cmp::Ordering::Greater => {
130                // Forward-compat: newer schema, warn but allow.
131                eprintln!(
132                    "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
133                );
134                Ok(())
135            }
136            std::cmp::Ordering::Less => {
137                // Delegate to the proper migration chain which handles all steps atomically.
138                let applied = crate::migrate::run_migrations(&self.db_path)?;
139                if !applied.is_empty() {
140                    eprintln!("[innate] auto-migrated: {}", applied.join(", "));
141                }
142                Ok(())
143            }
144        }
145    }
146
147    // ------------------------------------------------------------------
148    // Transactions
149    // ------------------------------------------------------------------
150
151    pub fn begin_immediate(&self) -> Result<()> {
152        self.conn.execute_batch("BEGIN IMMEDIATE")?;
153        Ok(())
154    }
155
156    pub fn commit(&self) -> Result<()> {
157        self.conn.execute_batch("COMMIT")?;
158        Ok(())
159    }
160
161    pub fn rollback(&self) -> Result<()> {
162        self.conn.execute_batch("ROLLBACK")?;
163        // In-place cache upserts from the aborted transaction may not have
164        // persisted; drop caches so the next search reloads committed state.
165        self.invalidate_vector_caches();
166        Ok(())
167    }
168
169    // ------------------------------------------------------------------
170}
171
172// ------------------------------------------------------------------
173// Row types
174// ------------------------------------------------------------------
175
176#[derive(Debug, Default, Clone)]
177pub struct ChunkRow {
178    pub id: String,
179    pub skill_name: Option<String>,
180    pub seq: i64,
181    pub content: String,
182    pub trigger_desc: Option<String>,
183    pub anti_trigger_desc: Option<String>,
184    pub content_hash: String,
185    pub token_count: Option<i64>,
186    pub origin: String,
187    pub source: Option<String>,
188    pub agent: Option<String>,
189    pub maturity: Option<String>,
190    pub related_ids: Option<String>,
191    pub protected: i64,
192    pub state: String,
193    pub state_reason: Option<String>,
194    pub state_updated_at: Option<String>,
195    pub confidence: f64,
196    pub confidence_reason: Option<String>,
197    pub version: i64,
198    pub distilled_from: Option<String>,
199    pub distill_provider: Option<String>,
200    pub distill_model: Option<String>,
201    pub distill_prompt_version: Option<String>,
202    pub parent_id: Option<String>,
203    pub selected_count: i64,
204    pub used_count: i64,
205    pub used_success_count: i64,
206    pub success_trace_ids_count: i64,
207    pub last_success_at: Option<String>,
208    pub last_agg_ts: Option<String>,
209    pub embed_version: i64,
210    pub created_at: String,
211    pub updated_at: String,
212    pub last_used_at: Option<String>,
213}
214
215#[derive(Debug, Default)]
216pub struct EpisodicLogRow {
217    pub id: String,
218    pub trace_id: String,
219    pub lib_id: String,
220    pub ts: String,
221    pub query: Option<String>,
222    pub recall_snapshot: Option<String>,
223    pub output: Option<String>,
224    pub output_summary: Option<String>,
225    pub outcome: Option<String>,
226    pub event_source: String,
227    pub agent: Option<String>,
228    pub task_state: String,
229    pub completed_at: Option<String>,
230    pub usage_state: String,
231    pub used_ids: Option<String>,
232    pub used_attribution: Option<String>,
233    pub used_complete: bool,
234    pub context_key: Option<String>,
235    pub nomination: Option<String>,
236    pub priority: i64,
237    pub distill_state: String,
238    pub distill_note: Option<String>,
239}
240
241// ------------------------------------------------------------------
242// Helpers
243// ------------------------------------------------------------------
244
245fn configure_pragmas(conn: &Connection) -> Result<()> {
246    conn.execute_batch(
247        "PRAGMA journal_mode=WAL;
248         PRAGMA foreign_keys=ON;
249         PRAGMA synchronous=NORMAL;
250         PRAGMA cache_size=-65536;
251         PRAGMA mmap_size=268435456;
252         PRAGMA busy_timeout=5000;
253         PRAGMA temp_store=memory;",
254    )?;
255    // Validate WAL mode was accepted (some VFS/filesystems silently downgrade).
256    let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
257    if mode != "wal" {
258        return Err(crate::errors::InnateError::Other(format!(
259            "WAL mode required but got '{mode}'; check filesystem support"
260        )));
261    }
262    Ok(())
263}
264
265fn ver_tuple(v: &str) -> (u32, u32, u32) {
266    let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
267    (
268        parts.first().copied().unwrap_or(0),
269        parts.get(1).copied().unwrap_or(0),
270        parts.get(2).copied().unwrap_or(0),
271    )
272}
273
274/// Convert a rusqlite Row to serde_json::Value using column names from statement.
275fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
276    let mut map = serde_json::Map::new();
277    for (i, name) in names.iter().enumerate() {
278        let v = row_value_at(row, i);
279        map.insert(name.clone(), v);
280    }
281    Ok(Value::Object(map))
282}
283
284fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
285    let count = row.as_ref().column_count();
286    let mut map = serde_json::Map::new();
287    for i in 0..count {
288        let name = row.as_ref().column_name(i)?.to_string();
289        let v = row_value_at(row, i);
290        map.insert(name, v);
291    }
292    Ok(Value::Object(map))
293}
294
295fn row_value_at(row: &Row, i: usize) -> Value {
296    // Try types in preference order
297    if let Ok(v) = row.get::<_, Option<String>>(i) {
298        return v.map(Value::String).unwrap_or(Value::Null);
299    }
300    if let Ok(v) = row.get::<_, Option<i64>>(i) {
301        return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
302    }
303    if let Ok(v) = row.get::<_, Option<f64>>(i) {
304        return v
305            .and_then(serde_json::Number::from_f64)
306            .map(Value::Number)
307            .unwrap_or(Value::Null);
308    }
309    Value::Null
310}
311
312trait OptionalExt<T> {
313    fn optional(self) -> rusqlite::Result<Option<T>>;
314}
315impl<T> OptionalExt<T> for rusqlite::Result<T> {
316    fn optional(self) -> rusqlite::Result<Option<T>> {
317        match self {
318            Ok(v) => Ok(Some(v)),
319            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
320            Err(e) => Err(e),
321        }
322    }
323}