use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use rusqlite::{params, Connection, Row};
use serde_json::Value;
use crate::errors::{InnateError, Result};
use crate::utils::{dot_product, l2_normalize, unpack_embedding};
mod chunks;
mod evolution;
mod meta;
mod raw;
mod traces;
const EXPECTED_SCHEMA_VERSION: &str = "4.15";
const SCHEMA_SQL: &str = include_str!("../schema.sql");
type VectorEntries = Vec<(String, Vec<f32>)>;
type VectorCache = RefCell<Option<VectorEntries>>;
pub type DepEdge = (String, String, Option<String>);
pub struct Storage {
pub db_path: PathBuf,
conn: Connection,
pub content_dim: usize,
pub trigger_dim: usize,
vec_content_cache: VectorCache,
vec_trigger_cache: VectorCache,
vector_cache_revision: Cell<Option<i64>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EvolveRequestClaim {
pub id: String,
pub reason: String,
}
impl Storage {
pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
let db_path = db_path.as_ref().to_path_buf();
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(&db_path)?;
configure_pragmas(&conn)?;
let mut s = Self {
db_path,
conn,
content_dim,
trigger_dim,
vec_content_cache: RefCell::new(None),
vec_trigger_cache: RefCell::new(None),
vector_cache_revision: Cell::new(None),
};
s.init_schema()?;
Ok(s)
}
pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
let db_path = db_path.as_ref().to_path_buf();
let conn = Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
conn.pragma_update(None, "query_only", "ON")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
let s = Self {
db_path,
conn,
content_dim: 1024,
trigger_dim: 256,
vec_content_cache: RefCell::new(None),
vec_trigger_cache: RefCell::new(None),
vector_cache_revision: Cell::new(None),
};
Ok(s)
}
fn init_schema(&mut self) -> Result<()> {
let has_meta: bool = self.conn.query_row(
"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
[],
|r| r.get::<_, i64>(0),
)? > 0;
if !has_meta {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
let r = self.conn.execute_batch(SCHEMA_SQL);
if r.is_ok() {
self.conn.execute_batch("COMMIT")?;
} else {
let _ = self.conn.execute_batch("ROLLBACK");
r?;
}
return Ok(());
}
let current: Option<String> = self
.conn
.query_row(
"SELECT value FROM meta WHERE key='schema_version'",
[],
|r| r.get(0),
)
.optional()?;
let current = current
.ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
let cur = ver_tuple(¤t);
let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
match cur.cmp(&exp) {
std::cmp::Ordering::Equal => Ok(()),
std::cmp::Ordering::Greater => {
eprintln!(
"[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
);
Ok(())
}
std::cmp::Ordering::Less => {
let applied = crate::migrate::run_migrations(&self.db_path)?;
if !applied.is_empty() {
eprintln!("[innate] auto-migrated: {}", applied.join(", "));
}
Ok(())
}
}
}
pub fn begin_immediate(&self) -> Result<()> {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
Ok(())
}
pub fn commit(&self) -> Result<()> {
self.conn.execute_batch("COMMIT")?;
Ok(())
}
pub fn rollback(&self) -> Result<()> {
self.conn.execute_batch("ROLLBACK")?;
self.invalidate_vector_caches();
Ok(())
}
}
#[derive(Debug, Default, Clone)]
pub struct ChunkRow {
pub id: String,
pub skill_name: Option<String>,
pub seq: i64,
pub content: String,
pub trigger_desc: Option<String>,
pub anti_trigger_desc: Option<String>,
pub content_hash: String,
pub token_count: Option<i64>,
pub origin: String,
pub source: Option<String>,
pub maturity: Option<String>,
pub related_ids: Option<String>,
pub protected: i64,
pub state: String,
pub state_reason: Option<String>,
pub state_updated_at: Option<String>,
pub confidence: f64,
pub confidence_reason: Option<String>,
pub version: i64,
pub distilled_from: Option<String>,
pub distill_provider: Option<String>,
pub distill_model: Option<String>,
pub distill_prompt_version: Option<String>,
pub parent_id: Option<String>,
pub selected_count: i64,
pub used_count: i64,
pub used_success_count: i64,
pub success_trace_ids_count: i64,
pub last_success_at: Option<String>,
pub last_agg_ts: Option<String>,
pub embed_version: i64,
pub created_at: String,
pub updated_at: String,
pub last_used_at: Option<String>,
}
#[derive(Debug, Default)]
pub struct EpisodicLogRow {
pub id: String,
pub trace_id: String,
pub lib_id: String,
pub ts: String,
pub query: Option<String>,
pub recall_snapshot: Option<String>,
pub output: Option<String>,
pub output_summary: Option<String>,
pub outcome: Option<String>,
pub event_source: String,
pub task_state: String,
pub completed_at: Option<String>,
pub usage_state: String,
pub used_ids: Option<String>,
pub used_attribution: Option<String>,
pub used_complete: bool,
pub context_key: Option<String>,
pub nomination: Option<String>,
pub priority: i64,
pub distill_state: String,
pub distill_note: Option<String>,
}
fn configure_pragmas(conn: &Connection) -> Result<()> {
conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-65536;
PRAGMA mmap_size=268435456;
PRAGMA busy_timeout=5000;
PRAGMA temp_store=memory;",
)?;
let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
if mode != "wal" {
return Err(crate::errors::InnateError::Other(format!(
"WAL mode required but got '{mode}'; check filesystem support"
)));
}
Ok(())
}
fn ver_tuple(v: &str) -> (u32, u32, u32) {
let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
(
parts.first().copied().unwrap_or(0),
parts.get(1).copied().unwrap_or(0),
parts.get(2).copied().unwrap_or(0),
)
}
fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
let mut map = serde_json::Map::new();
for (i, name) in names.iter().enumerate() {
let v = row_value_at(row, i);
map.insert(name.clone(), v);
}
Ok(Value::Object(map))
}
fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
let count = row.as_ref().column_count();
let mut map = serde_json::Map::new();
for i in 0..count {
let name = row.as_ref().column_name(i)?.to_string();
let v = row_value_at(row, i);
map.insert(name, v);
}
Ok(Value::Object(map))
}
fn row_value_at(row: &Row, i: usize) -> Value {
if let Ok(v) = row.get::<_, Option<String>>(i) {
return v.map(Value::String).unwrap_or(Value::Null);
}
if let Ok(v) = row.get::<_, Option<i64>>(i) {
return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
}
if let Ok(v) = row.get::<_, Option<f64>>(i) {
return v
.and_then(serde_json::Number::from_f64)
.map(Value::Number)
.unwrap_or(Value::Null);
}
Value::Null
}
trait OptionalExt<T> {
fn optional(self) -> rusqlite::Result<Option<T>>;
}
impl<T> OptionalExt<T> for rusqlite::Result<T> {
fn optional(self) -> rusqlite::Result<Option<T>> {
match self {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e),
}
}
}