1use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{dot_product, l2_normalize, unpack_embedding};
15
16mod chunks;
17mod evolution;
18mod meta;
19mod raw;
20mod traces;
21
22const EXPECTED_SCHEMA_VERSION: &str = "4.16";
23
24const SCHEMA_SQL: &str = include_str!("../schema.sql");
26
27type VectorEntries = Vec<(String, Vec<f32>)>;
28type VectorCache = RefCell<Option<VectorEntries>>;
29
30pub type DepEdge = (String, String, Option<String>);
32
33pub struct Storage {
34 pub db_path: PathBuf,
35 conn: Connection,
36 pub content_dim: usize,
37 pub trigger_dim: usize,
38 vec_content_cache: VectorCache,
40 vec_trigger_cache: VectorCache,
41 vector_cache_revision: Cell<Option<i64>>,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct EvolveRequestClaim {
47 pub id: String,
48 pub reason: String,
49}
50
51impl Storage {
52 pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
53 let db_path = db_path.as_ref().to_path_buf();
54 if let Some(parent) = db_path.parent() {
55 std::fs::create_dir_all(parent)?;
56 }
57 let conn = Connection::open(&db_path)?;
58 configure_pragmas(&conn)?;
59 let mut s = Self {
60 db_path,
61 conn,
62 content_dim,
63 trigger_dim,
64 vec_content_cache: RefCell::new(None),
65 vec_trigger_cache: RefCell::new(None),
66 vector_cache_revision: Cell::new(None),
67 };
68 s.init_schema()?;
69 Ok(s)
70 }
71
72 pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
73 let db_path = db_path.as_ref().to_path_buf();
74 let conn = Connection::open_with_flags(
75 &db_path,
76 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
77 )?;
78 conn.pragma_update(None, "query_only", "ON")?;
79 conn.pragma_update(None, "foreign_keys", "ON")?;
80 let s = Self {
81 db_path,
82 conn,
83 content_dim: 1024,
84 trigger_dim: 256,
85 vec_content_cache: RefCell::new(None),
86 vec_trigger_cache: RefCell::new(None),
87 vector_cache_revision: Cell::new(None),
88 };
89 Ok(s)
90 }
91
92 fn init_schema(&mut self) -> Result<()> {
93 let has_meta: bool = self.conn.query_row(
94 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
95 [],
96 |r| r.get::<_, i64>(0),
97 )? > 0;
98
99 if !has_meta {
100 self.conn.execute_batch("BEGIN IMMEDIATE")?;
102 let r = self.conn.execute_batch(SCHEMA_SQL);
103 if r.is_ok() {
104 self.conn.execute_batch("COMMIT")?;
105 } else {
106 let _ = self.conn.execute_batch("ROLLBACK");
107 r?;
108 }
109 return Ok(());
110 }
111
112 let current: Option<String> = self
113 .conn
114 .query_row(
115 "SELECT value FROM meta WHERE key='schema_version'",
116 [],
117 |r| r.get(0),
118 )
119 .optional()?;
120
121 let current = current
122 .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
123
124 let cur = ver_tuple(¤t);
125 let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
126
127 match cur.cmp(&exp) {
128 std::cmp::Ordering::Equal => Ok(()),
129 std::cmp::Ordering::Greater => {
130 eprintln!(
132 "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
133 );
134 Ok(())
135 }
136 std::cmp::Ordering::Less => {
137 let applied = crate::migrate::run_migrations(&self.db_path)?;
139 if !applied.is_empty() {
140 eprintln!("[innate] auto-migrated: {}", applied.join(", "));
141 }
142 Ok(())
143 }
144 }
145 }
146
147 pub fn begin_immediate(&self) -> Result<()> {
152 self.conn.execute_batch("BEGIN IMMEDIATE")?;
153 Ok(())
154 }
155
156 pub fn commit(&self) -> Result<()> {
157 self.conn.execute_batch("COMMIT")?;
158 Ok(())
159 }
160
161 pub fn rollback(&self) -> Result<()> {
162 self.conn.execute_batch("ROLLBACK")?;
163 self.invalidate_vector_caches();
166 Ok(())
167 }
168
169 }
171
172#[derive(Debug, Default, Clone)]
177pub struct ChunkRow {
178 pub id: String,
179 pub skill_name: Option<String>,
180 pub seq: i64,
181 pub content: String,
182 pub trigger_desc: Option<String>,
183 pub anti_trigger_desc: Option<String>,
184 pub content_hash: String,
185 pub token_count: Option<i64>,
186 pub origin: String,
187 pub source: Option<String>,
188 pub maturity: Option<String>,
189 pub related_ids: Option<String>,
190 pub protected: i64,
191 pub state: String,
192 pub state_reason: Option<String>,
193 pub state_updated_at: Option<String>,
194 pub confidence: f64,
195 pub confidence_reason: Option<String>,
196 pub version: i64,
197 pub distilled_from: Option<String>,
198 pub distill_provider: Option<String>,
199 pub distill_model: Option<String>,
200 pub distill_prompt_version: Option<String>,
201 pub parent_id: Option<String>,
202 pub selected_count: i64,
203 pub used_count: i64,
204 pub used_success_count: i64,
205 pub success_trace_ids_count: i64,
206 pub last_success_at: Option<String>,
207 pub last_agg_ts: Option<String>,
208 pub embed_version: i64,
209 pub created_at: String,
210 pub updated_at: String,
211 pub last_used_at: Option<String>,
212}
213
214#[derive(Debug, Default)]
215pub struct EpisodicLogRow {
216 pub id: String,
217 pub trace_id: String,
218 pub lib_id: String,
219 pub ts: String,
220 pub query: Option<String>,
221 pub recall_snapshot: Option<String>,
222 pub output: Option<String>,
223 pub output_summary: Option<String>,
224 pub outcome: Option<String>,
225 pub event_source: String,
226 pub task_state: String,
227 pub completed_at: Option<String>,
228 pub usage_state: String,
229 pub used_ids: Option<String>,
230 pub used_attribution: Option<String>,
231 pub used_complete: bool,
232 pub context_key: Option<String>,
233 pub nomination: Option<String>,
234 pub priority: i64,
235 pub distill_state: String,
236 pub distill_note: Option<String>,
237}
238
239fn configure_pragmas(conn: &Connection) -> Result<()> {
244 conn.execute_batch(
245 "PRAGMA journal_mode=WAL;
246 PRAGMA foreign_keys=ON;
247 PRAGMA synchronous=NORMAL;
248 PRAGMA cache_size=-65536;
249 PRAGMA mmap_size=268435456;
250 PRAGMA busy_timeout=5000;
251 PRAGMA temp_store=memory;",
252 )?;
253 let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
255 if mode != "wal" {
256 return Err(crate::errors::InnateError::Other(format!(
257 "WAL mode required but got '{mode}'; check filesystem support"
258 )));
259 }
260 Ok(())
261}
262
263fn ver_tuple(v: &str) -> (u32, u32, u32) {
264 let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
265 (
266 parts.first().copied().unwrap_or(0),
267 parts.get(1).copied().unwrap_or(0),
268 parts.get(2).copied().unwrap_or(0),
269 )
270}
271
272fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
274 let mut map = serde_json::Map::new();
275 for (i, name) in names.iter().enumerate() {
276 let v = row_value_at(row, i);
277 map.insert(name.clone(), v);
278 }
279 Ok(Value::Object(map))
280}
281
282fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
283 let count = row.as_ref().column_count();
284 let mut map = serde_json::Map::new();
285 for i in 0..count {
286 let name = row.as_ref().column_name(i)?.to_string();
287 let v = row_value_at(row, i);
288 map.insert(name, v);
289 }
290 Ok(Value::Object(map))
291}
292
293fn row_value_at(row: &Row, i: usize) -> Value {
294 if let Ok(v) = row.get::<_, Option<String>>(i) {
296 return v.map(Value::String).unwrap_or(Value::Null);
297 }
298 if let Ok(v) = row.get::<_, Option<i64>>(i) {
299 return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
300 }
301 if let Ok(v) = row.get::<_, Option<f64>>(i) {
302 return v
303 .and_then(serde_json::Number::from_f64)
304 .map(Value::Number)
305 .unwrap_or(Value::Null);
306 }
307 Value::Null
308}
309
310trait OptionalExt<T> {
311 fn optional(self) -> rusqlite::Result<Option<T>>;
312}
313impl<T> OptionalExt<T> for rusqlite::Result<T> {
314 fn optional(self) -> rusqlite::Result<Option<T>> {
315 match self {
316 Ok(v) => Ok(Some(v)),
317 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
318 Err(e) => Err(e),
319 }
320 }
321}