1use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{cosine_similarity, unpack_embedding};
15
16const EXPECTED_SCHEMA_VERSION: &str = "4.13";
17
18const SCHEMA_SQL: &str = include_str!("schema.sql");
20
21type VectorEntries = Vec<(String, Vec<f32>)>;
22type VectorCache = RefCell<Option<VectorEntries>>;
23
24pub struct Storage {
25 pub db_path: PathBuf,
26 conn: Connection,
27 pub content_dim: usize,
28 pub trigger_dim: usize,
29 vec_content_cache: VectorCache,
31 vec_trigger_cache: VectorCache,
32 vector_cache_revision: Cell<Option<i64>>,
34}
35
36impl Storage {
37 pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
38 let db_path = db_path.as_ref().to_path_buf();
39 if let Some(parent) = db_path.parent() {
40 std::fs::create_dir_all(parent)?;
41 }
42 let conn = Connection::open(&db_path)?;
43 configure_pragmas(&conn)?;
44 let mut s = Self {
45 db_path,
46 conn,
47 content_dim,
48 trigger_dim,
49 vec_content_cache: RefCell::new(None),
50 vec_trigger_cache: RefCell::new(None),
51 vector_cache_revision: Cell::new(None),
52 };
53 s.init_schema()?;
54 Ok(s)
55 }
56
57 pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
58 let db_path = db_path.as_ref().to_path_buf();
59 let conn = Connection::open_with_flags(
60 &db_path,
61 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
62 )?;
63 conn.pragma_update(None, "query_only", "ON")?;
64 conn.pragma_update(None, "foreign_keys", "ON")?;
65 let s = Self {
66 db_path,
67 conn,
68 content_dim: 1024,
69 trigger_dim: 256,
70 vec_content_cache: RefCell::new(None),
71 vec_trigger_cache: RefCell::new(None),
72 vector_cache_revision: Cell::new(None),
73 };
74 Ok(s)
75 }
76
77 fn init_schema(&mut self) -> Result<()> {
78 let has_meta: bool = self.conn.query_row(
79 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
80 [],
81 |r| r.get::<_, i64>(0),
82 )? > 0;
83
84 if !has_meta {
85 self.conn.execute_batch("BEGIN IMMEDIATE")?;
87 let r = self.conn.execute_batch(SCHEMA_SQL);
88 if r.is_ok() {
89 self.conn.execute_batch("COMMIT")?;
90 } else {
91 let _ = self.conn.execute_batch("ROLLBACK");
92 r?;
93 }
94 return Ok(());
95 }
96
97 let current: Option<String> = self
98 .conn
99 .query_row(
100 "SELECT value FROM meta WHERE key='schema_version'",
101 [],
102 |r| r.get(0),
103 )
104 .optional()?;
105
106 let current = current
107 .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
108
109 let cur = ver_tuple(¤t);
110 let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
111
112 match cur.cmp(&exp) {
113 std::cmp::Ordering::Equal => Ok(()),
114 std::cmp::Ordering::Greater => {
115 eprintln!(
117 "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
118 );
119 Ok(())
120 }
121 std::cmp::Ordering::Less => {
122 let applied = crate::migrate::run_migrations(&self.db_path)?;
124 if !applied.is_empty() {
125 eprintln!("[innate] auto-migrated: {}", applied.join(", "));
126 }
127 Ok(())
128 }
129 }
130 }
131
132 pub fn begin_immediate(&self) -> Result<()> {
137 self.conn.execute_batch("BEGIN IMMEDIATE")?;
138 Ok(())
139 }
140
141 pub fn commit(&self) -> Result<()> {
142 self.conn.execute_batch("COMMIT")?;
143 Ok(())
144 }
145
146 pub fn rollback(&self) -> Result<()> {
147 self.conn.execute_batch("ROLLBACK")?;
148 Ok(())
149 }
150
151 pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
156 Ok(self
157 .conn
158 .query_row("SELECT value FROM meta WHERE key=?", [key], |r| r.get(0))
159 .optional()?)
160 }
161
162 pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
163 self.conn.execute(
164 "INSERT OR REPLACE INTO meta(key, value) VALUES (?,?)",
165 params![key, value],
166 )?;
167 Ok(())
168 }
169
170 pub fn get_meta_or(&self, key: &str, default: &str) -> String {
171 self.get_meta(key)
172 .ok()
173 .flatten()
174 .unwrap_or_else(|| default.to_string())
175 }
176
177 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
182 self.conn.execute(
183 "INSERT INTO chunks (
184 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
185 content_hash, token_count, origin, source, maturity, related_ids,
186 protected, state, state_reason, state_updated_at,
187 confidence, confidence_base, confidence_reason, version, distilled_from,
188 distill_provider, distill_model, distill_prompt_version, parent_id,
189 selected_count, used_count, used_success_count,
190 success_trace_ids_count, last_success_at, last_agg_ts,
191 embed_version, created_at, updated_at, last_used_at
192 ) VALUES (
193 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
194 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
195 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
196 )",
197 params![
198 c.id,
199 c.skill_name,
200 c.seq,
201 c.content,
202 c.trigger_desc,
203 c.anti_trigger_desc,
204 c.content_hash,
205 c.token_count,
206 c.origin,
207 c.source,
208 c.maturity,
209 c.related_ids,
210 c.protected,
211 c.state,
212 c.state_reason,
213 c.state_updated_at,
214 c.confidence,
215 c.confidence,
216 c.confidence_reason,
217 c.version,
218 c.distilled_from,
219 c.distill_provider,
220 c.distill_model,
221 c.distill_prompt_version,
222 c.parent_id,
223 c.selected_count,
224 c.used_count,
225 c.used_success_count,
226 c.success_trace_ids_count,
227 c.last_success_at,
228 c.last_agg_ts,
229 c.embed_version,
230 c.created_at,
231 c.updated_at,
232 c.last_used_at
233 ],
234 )?;
235 Ok(())
236 }
237
238 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
239 self.conn.execute(
240 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
241 params![chunk_id, emb],
242 )?;
243 *self.vec_content_cache.borrow_mut() = None;
244 Ok(())
245 }
246
247 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
248 self.conn.execute(
249 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
250 params![chunk_id, emb],
251 )?;
252 self.conn.execute(
253 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
254 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
255 [],
256 )?;
257 *self.vec_trigger_cache.borrow_mut() = None;
258 Ok(())
259 }
260
261 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
262 let row = self
263 .conn
264 .query_row("SELECT * FROM chunks WHERE id=?", [id], row_to_json);
265 match row {
266 Ok(v) => Ok(Some(v)),
267 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
268 Err(e) => Err(e.into()),
269 }
270 }
271
272 pub fn update_chunk_state(
273 &self,
274 id: &str,
275 state: &str,
276 reason: Option<&str>,
277 now: &str,
278 ) -> Result<()> {
279 self.conn.execute(
280 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
281 params![state, reason, now, now, id],
282 )?;
283 Ok(())
284 }
285
286 pub fn update_chunk_confidence(
287 &self,
288 id: &str,
289 conf: f64,
290 reason: Option<&str>,
291 now: &str,
292 ) -> Result<()> {
293 self.conn.execute(
294 "UPDATE chunks
295 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
296 WHERE id=?",
297 params![conf, conf, reason, now, id],
298 )?;
299 Ok(())
300 }
301
302 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
303 self.conn.execute(
304 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
305 params![now, now, id],
306 )?;
307 Ok(())
308 }
309
310 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
311 let row = self.conn.query_row(
312 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
313 [hash],
314 row_to_json,
315 );
316 match row {
317 Ok(v) => Ok(Some(v)),
318 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
319 Err(e) => Err(e.into()),
320 }
321 }
322
323 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
328 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
329 }
330
331 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
332 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
333 }
334
335 fn search_vec(
336 &self,
337 cache_cell: &VectorCache,
338 table: &str,
339 query: &[f32],
340 limit: usize,
341 ) -> Result<Vec<(String, f32)>> {
342 if limit == 0 {
343 return Ok(Vec::new());
344 }
345 self.refresh_vector_caches_if_changed()?;
346
347 if cache_cell.borrow().is_none() {
349 let sql = format!("SELECT chunk_id, embedding FROM {table}");
350 let mut stmt = self.conn.prepare(&sql)?;
351 let entries: Vec<(String, Vec<f32>)> = stmt
352 .query_map([], |r| {
353 let id: String = r.get(0)?;
354 let blob: Vec<u8> = r.get(1)?;
355 Ok((id, blob))
356 })?
357 .filter_map(|r| r.ok())
358 .map(|(id, blob)| (id, unpack_embedding(&blob)))
359 .collect();
360 *cache_cell.borrow_mut() = Some(entries);
361 }
362
363 let cache = cache_cell.borrow();
364 let entries = cache.as_ref().unwrap();
365
366 let mut results: Vec<(String, f32)> = entries
368 .iter()
369 .map(|(id, v)| (id.clone(), cosine_similarity(query, v)))
370 .collect();
371 if results.len() > limit {
372 results.select_nth_unstable_by(limit - 1, |a, b| {
373 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
374 });
375 results.truncate(limit);
376 }
377 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
378 Ok(results)
379 }
380
381 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
382 let current = self
383 .get_meta("vector_revision")?
384 .and_then(|value| value.parse::<i64>().ok())
385 .unwrap_or(0);
386 let previous = self.vector_cache_revision.replace(Some(current));
387 if previous.is_some_and(|revision| revision != current) {
388 *self.vec_content_cache.borrow_mut() = None;
389 *self.vec_trigger_cache.borrow_mut() = None;
390 }
391 Ok(())
392 }
393
394 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
396 if ids.is_empty() {
397 return Ok(HashMap::new());
398 }
399 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
400 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
401 let mut stmt = self.conn.prepare(&sql)?;
402 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
403 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
404 row_to_json_with_names(r, &names)
405 })?;
406 let mut map = HashMap::with_capacity(ids.len());
407 for row in rows.filter_map(|r| r.ok()) {
408 if let Some(id) = row.get("id").and_then(Value::as_str) {
409 map.insert(id.to_string(), row);
410 }
411 }
412 Ok(map)
413 }
414
415 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
420 let count: i64 = self.conn.query_row(
421 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
422 [hash],
423 |r| r.get(0),
424 )?;
425 Ok(count > 0)
426 }
427
428 pub fn insert_invalidated_hash(
429 &self,
430 hash: &str,
431 reason: Option<&str>,
432 ts: &str,
433 ) -> Result<()> {
434 self.conn.execute(
435 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
436 params![hash, reason, ts],
437 )?;
438 Ok(())
439 }
440
441 #[allow(clippy::too_many_arguments)]
446 pub fn insert_usage_trace(
447 &self,
448 trace_id: &str,
449 chunk_id: Option<&str>,
450 event: &str,
451 strength: f64,
452 similarity: Option<f64>,
453 refine_mode: Option<&str>,
454 tokens: Option<i64>,
455 rank: Option<i64>,
456 attribution: Option<&str>,
457 source: &str,
458 ts: &str,
459 ) -> Result<usize> {
460 Ok(self.conn.execute(
461 "INSERT OR IGNORE INTO usage_trace
462 (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
463 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
464 params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
465 )?)
466 }
467
468 pub fn replace_used_trace(
469 &self,
470 trace_id: &str,
471 used_ids: &[String],
472 strength: f64,
473 attribution: &str,
474 source: &str,
475 ts: &str,
476 ) -> Result<()> {
477 self.conn.execute(
478 "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
479 [trace_id],
480 )?;
481 for chunk_id in used_ids {
482 self.insert_usage_trace(
483 trace_id,
484 Some(chunk_id),
485 "used",
486 strength,
487 None,
488 None,
489 None,
490 None,
491 Some(attribution),
492 source,
493 ts,
494 )?;
495 }
496 Ok(())
497 }
498
499 pub fn merge_used_trace(
500 &self,
501 trace_id: &str,
502 used_ids: &[String],
503 strength: f64,
504 attribution: &str,
505 source: &str,
506 ts: &str,
507 ) -> Result<()> {
508 let attribution_rank = |value: &str| match value {
509 "explicit" => 3,
510 "cited" => 2,
511 "inferred" => 1,
512 _ => 0,
513 };
514 for chunk_id in used_ids {
515 let existing = self
516 .query_chunks_params(
517 "SELECT attribution, strength FROM usage_trace
518 WHERE trace_id=? AND chunk_id=? AND event='used'",
519 params![trace_id, chunk_id],
520 )?
521 .into_iter()
522 .next();
523 if let Some(row) = existing {
524 let existing_attribution = row
525 .get("attribution")
526 .and_then(Value::as_str)
527 .unwrap_or("inferred");
528 if attribution_rank(attribution) > attribution_rank(existing_attribution) {
529 self.conn.execute(
530 "UPDATE usage_trace
531 SET strength=?, attribution=?, source=?, ts=?
532 WHERE trace_id=? AND chunk_id=? AND event='used'",
533 params![
534 strength,
535 attribution,
536 source,
537 ts,
538 trace_id,
539 chunk_id
540 ],
541 )?;
542 }
543 } else {
544 self.insert_usage_trace(
545 trace_id,
546 Some(chunk_id),
547 "used",
548 strength,
549 None,
550 None,
551 None,
552 None,
553 Some(attribution),
554 source,
555 ts,
556 )?;
557 }
558 }
559 Ok(())
560 }
561
562 pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
563 self.conn.execute(
564 "UPDATE chunks
565 SET last_used_at=COALESCE(
566 (SELECT MAX(ts) FROM usage_trace
567 WHERE chunk_id=? AND event='used'),
568 last_used_base
569 ),
570 updated_at=?
571 WHERE id=?",
572 params![chunk_id, now, chunk_id],
573 )?;
574 Ok(())
575 }
576
577 pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
578 let row = self.conn.query_row(
579 "SELECT event FROM usage_trace
580 WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
581 LIMIT 1",
582 [trace_id],
583 |r| r.get::<_, String>(0),
584 );
585 match row {
586 Ok(v) => Ok(Some(v)),
587 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
588 Err(e) => Err(e.into()),
589 }
590 }
591
592 pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
593 let n = self.conn.execute(
595 "DELETE FROM usage_trace
596 WHERE ts < ?
597 AND event IN ('retrieved','refined')
598 AND NOT (event = 'retrieved'
599 AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
600 [before_ts],
601 )?;
602 Ok(n)
603 }
604
605 pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
610 self.conn.execute(
611 "INSERT OR REPLACE INTO episodic_log
612 (id, trace_id, lib_id, ts, query, recall_snapshot, output,
613 output_summary, outcome, event_source, task_state, completed_at,
614 usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
615 distill_state, distill_note, distill_attempts, distill_last_failed_at)
616 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
617 params![
618 log.id,
619 log.trace_id,
620 log.lib_id,
621 log.ts,
622 log.query,
623 log.recall_snapshot,
624 log.output,
625 log.output_summary,
626 log.outcome,
627 log.event_source,
628 log.task_state,
629 log.completed_at,
630 log.usage_state,
631 log.used_ids,
632 log.used_attribution,
633 i64::from(log.used_complete),
634 log.context_key,
635 log.nomination,
636 log.priority,
637 log.distill_state,
638 log.distill_note
639 ],
640 )?;
641 Ok(())
642 }
643
644 pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
645 let row = self.conn.query_row(
646 "SELECT * FROM episodic_log WHERE trace_id=?",
647 [trace_id],
648 row_to_json,
649 );
650 match row {
651 Ok(v) => Ok(Some(v)),
652 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
653 Err(e) => Err(e.into()),
654 }
655 }
656
657 pub fn update_episodic_log_state(
658 &self,
659 trace_id: &str,
660 state: &str,
661 note: Option<&str>,
662 outcome: Option<&str>,
663 ) -> Result<()> {
664 self.conn.execute(
665 "UPDATE episodic_log
666 SET distill_state=?, distill_note=COALESCE(?,distill_note),
667 outcome=COALESCE(?,outcome),
668 distill_run_id=NULL, distill_locked_at=NULL
669 WHERE trace_id=?",
670 params![state, note, outcome, trace_id],
671 )?;
672 Ok(())
673 }
674
675 pub fn patch_episodic_log_content(
677 &self,
678 trace_id: &str,
679 query: Option<&str>,
680 output: Option<&str>,
681 output_summary: Option<&str>,
682 nomination: Option<&str>,
683 priority: i64,
684 ) -> Result<()> {
685 self.conn.execute(
686 "UPDATE episodic_log
687 SET output_summary = COALESCE(?, output_summary),
688 nomination = COALESCE(?, nomination),
689 output = COALESCE(?, output),
690 query = COALESCE(?, query),
691 priority = MAX(priority, ?)
692 WHERE trace_id = ?",
693 params![
694 output_summary,
695 nomination,
696 output,
697 query,
698 priority,
699 trace_id
700 ],
701 )?;
702 Ok(())
703 }
704
705 #[allow(clippy::too_many_arguments)]
706 pub fn update_trace_lifecycle(
707 &self,
708 trace_id: &str,
709 task_state: &str,
710 completed_at: Option<&str>,
711 usage_state: Option<&str>,
712 used_ids: Option<&str>,
713 used_attribution: Option<&str>,
714 used_complete: Option<bool>,
715 ) -> Result<()> {
716 self.conn.execute(
717 "UPDATE episodic_log
718 SET task_state=?,
719 completed_at=COALESCE(?, completed_at),
720 usage_state=COALESCE(?, usage_state),
721 used_ids=COALESCE(?, used_ids),
722 used_attribution=COALESCE(?, used_attribution),
723 used_complete=COALESCE(?, used_complete)
724 WHERE trace_id=?",
725 params![
726 task_state,
727 completed_at,
728 usage_state,
729 used_ids,
730 used_attribution,
731 used_complete.map(i64::from),
732 trace_id
733 ],
734 )?;
735 Ok(())
736 }
737
738 #[allow(clippy::too_many_arguments)]
739 pub fn upsert_confidence_evidence(
740 &self,
741 id: &str,
742 trace_id: Option<&str>,
743 chunk_id: &str,
744 kind: &str,
745 target: f64,
746 alpha: f64,
747 reason: &str,
748 context_key: Option<&str>,
749 ts: &str,
750 ) -> Result<()> {
751 self.conn.execute(
752 "INSERT INTO confidence_evidence
753 (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
754 VALUES (?,?,?,?,?,?,?,?,?)
755 ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
756 DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
757 reason=excluded.reason, context_key=excluded.context_key",
758 params![id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts],
759 )?;
760 Ok(())
761 }
762
763 pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
764 for kind in kinds {
765 self.conn.execute(
766 "DELETE FROM confidence_evidence WHERE trace_id=? AND kind=?",
767 params![trace_id, kind],
768 )?;
769 }
770 Ok(())
771 }
772
773 pub fn delete_chunk_trace_confidence_evidence(
774 &self,
775 trace_id: &str,
776 chunk_id: &str,
777 kind: &str,
778 ) -> Result<()> {
779 self.conn.execute(
780 "DELETE FROM confidence_evidence
781 WHERE trace_id=? AND chunk_id=? AND kind=?",
782 params![trace_id, chunk_id, kind],
783 )?;
784 Ok(())
785 }
786
787 pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
788 self.query_json(
789 "SELECT target, alpha, reason, ts, id
790 FROM confidence_evidence WHERE chunk_id=?
791 ORDER BY ts ASC,
792 CASE kind
793 WHEN 'outcome_ok' THEN 1
794 WHEN 'outcome_fail' THEN 1
795 WHEN 'selected_unused' THEN 2
796 WHEN 'feedback_up' THEN 3
797 WHEN 'feedback_down' THEN 3
798 WHEN 'decay' THEN 4
799 ELSE 5
800 END ASC,
801 kind ASC, id ASC",
802 [chunk_id],
803 )
804 }
805
806 #[allow(clippy::too_many_arguments)]
807 pub fn insert_feedback_event(
808 &self,
809 id: &str,
810 trace_id: &str,
811 chunk_id: &str,
812 signal: &str,
813 strength: f64,
814 source: &str,
815 actor: Option<&str>,
816 reason: Option<&str>,
817 context_key: Option<&str>,
818 ts: &str,
819 ) -> Result<usize> {
820 Ok(self.conn.execute(
821 "INSERT OR IGNORE INTO feedback_events
822 (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
823 VALUES (?,?,?,?,?,?,?,?,?,?)",
824 params![
825 id,
826 trace_id,
827 chunk_id,
828 signal,
829 strength,
830 source,
831 actor,
832 reason,
833 context_key,
834 ts
835 ],
836 )?)
837 }
838
839 pub fn delete_feedback_event(
840 &self,
841 trace_id: &str,
842 chunk_id: &str,
843 signal: &str,
844 ) -> Result<usize> {
845 Ok(self.conn.execute(
846 "DELETE FROM feedback_events
847 WHERE trace_id=? AND chunk_id=? AND signal=?",
848 params![trace_id, chunk_id, signal],
849 )?)
850 }
851
852 pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
853 self.conn.execute(
854 "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
855 params![now, now, id],
856 )?;
857 Ok(())
858 }
859
860 #[allow(clippy::too_many_arguments)]
861 pub fn update_context_stat(
862 &self,
863 chunk_id: &str,
864 context_key: &str,
865 success: i64,
866 failure: i64,
867 positive: i64,
868 negative: i64,
869 now: &str,
870 ) -> Result<()> {
871 self.conn.execute(
872 "INSERT INTO chunk_context_stats
873 (chunk_id, context_key, success_count, failure_count,
874 positive_feedback, negative_feedback, last_updated_at)
875 VALUES (?,?,?,?,?,?,?)
876 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
877 success_count=success_count+excluded.success_count,
878 failure_count=failure_count+excluded.failure_count,
879 positive_feedback=positive_feedback+excluded.positive_feedback,
880 negative_feedback=negative_feedback+excluded.negative_feedback,
881 last_updated_at=excluded.last_updated_at",
882 params![
883 chunk_id,
884 context_key,
885 success,
886 failure,
887 positive,
888 negative,
889 now
890 ],
891 )?;
892 Ok(())
893 }
894
895 pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
896 let row = self
897 .conn
898 .query_row(
899 "SELECT success_count, failure_count, positive_feedback, negative_feedback
900 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
901 params![chunk_id, context_key],
902 |row| {
903 Ok((
904 row.get::<_, i64>(0)?,
905 row.get::<_, i64>(1)?,
906 row.get::<_, i64>(2)?,
907 row.get::<_, i64>(3)?,
908 ))
909 },
910 )
911 .optional()?;
912 let Some((success, failure, positive, negative)) = row else {
913 return Ok(0.0);
914 };
915 let wins = success as f64 + positive as f64 * 2.0;
916 let losses = failure as f64 + negative as f64 * 2.0;
917 let evidence = wins + losses;
918 let posterior = (wins + 1.0) / (evidence + 2.0);
919 let evidence_weight = (evidence / 5.0).min(1.0);
920 Ok((posterior - 0.5) * 2.0 * evidence_weight)
921 }
922
923 #[allow(clippy::too_many_arguments)]
924 pub fn upsert_governance_proposal(
925 &self,
926 id: &str,
927 chunk_id: &str,
928 proposal_type: &str,
929 reason: &str,
930 evidence_count: i64,
931 evidence_score: f64,
932 actor_count: i64,
933 now: &str,
934 ) -> Result<()> {
935 self.conn.execute(
936 "INSERT INTO governance_proposals
937 (id, chunk_id, proposal_type, reason, evidence_count,
938 evidence_score, actor_count, state, created_at, updated_at)
939 VALUES (?,?,?,?,?,?,?,'pending',?,?)
940 ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
941 DO UPDATE SET reason=excluded.reason,
942 evidence_count=excluded.evidence_count,
943 evidence_score=excluded.evidence_score,
944 actor_count=excluded.actor_count,
945 updated_at=excluded.updated_at",
946 params![
947 id,
948 chunk_id,
949 proposal_type,
950 reason,
951 evidence_count,
952 evidence_score,
953 actor_count,
954 now,
955 now
956 ],
957 )?;
958 Ok(())
959 }
960
961 pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
962 let priority = match reason {
963 "governance_ready" => 100,
964 "governance" => 80,
965 "threshold" => 60,
966 "batch_continue" => 40,
967 _ => 20,
968 };
969 self.conn.execute(
970 "INSERT INTO evolve_requests(id, reason, state, requested_at, priority)
971 VALUES (?,?,'pending',?,?)
972 ON CONFLICT(reason) WHERE state='pending'
973 DO UPDATE SET priority=MAX(priority, excluded.priority)",
974 params![id, reason, now, priority],
975 )?;
976 Ok(())
977 }
978
979 pub fn claim_evolve_request(&self, now: &str, stale_before: &str) -> Result<Option<String>> {
980 self.conn.execute(
981 "UPDATE evolve_requests
982 SET state='pending', leased_at=NULL, note='lease_recovered'
983 WHERE state='running' AND leased_at < ?",
984 [stale_before],
985 )?;
986 self.conn.execute(
987 "UPDATE evolve_requests
988 SET state='pending', leased_at=NULL, note='retry_failed'
989 WHERE state='failed' AND attempts < 3
990 AND COALESCE(next_retry_at, completed_at) < ?",
991 [now],
992 )?;
993 Ok(self
994 .conn
995 .query_row(
996 "UPDATE evolve_requests
997 SET state='running', leased_at=?, attempts=attempts+1
998 WHERE id=(
999 SELECT id FROM evolve_requests
1000 WHERE state='pending' ORDER BY priority DESC, requested_at ASC LIMIT 1
1001 ) AND state='pending'
1002 RETURNING id",
1003 [now],
1004 |row| row.get(0),
1005 )
1006 .optional()?)
1007 }
1008
1009 pub fn finish_evolve_request(
1010 &self,
1011 id: &str,
1012 state: &str,
1013 note: Option<&str>,
1014 now: &str,
1015 ) -> Result<()> {
1016 self.conn.execute(
1017 "UPDATE evolve_requests
1018 SET state=?, completed_at=?, note=?,
1019 last_failed_at=CASE
1020 WHEN ?='failed' THEN ?
1021 ELSE last_failed_at
1022 END,
1023 next_retry_at=CASE WHEN ?='failed'
1024 THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
1025 ELSE NULL END
1026 WHERE id=?",
1027 params![state, now, note, state, now, state, now, id],
1028 )?;
1029 Ok(())
1030 }
1031
1032 pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
1033 self.conn.execute(
1034 "UPDATE evolve_requests
1035 SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
1036 WHERE state='pending' AND requested_at <= ?",
1037 params![now, requested_before],
1038 )?;
1039 Ok(())
1040 }
1041
1042 pub fn update_episodic_log_state_by_id(
1044 &self,
1045 id: &str,
1046 state: &str,
1047 note: Option<&str>,
1048 outcome: Option<&str>,
1049 ) -> Result<()> {
1050 self.conn.execute(
1051 "UPDATE episodic_log
1052 SET distill_state=?, distill_note=COALESCE(?,distill_note),
1053 outcome=COALESCE(?,outcome),
1054 distill_run_id=NULL, distill_locked_at=NULL
1055 WHERE id=?",
1056 params![state, note, outcome, id],
1057 )?;
1058 Ok(())
1059 }
1060
1061 #[allow(clippy::too_many_arguments)]
1062 pub fn finish_distill_log(
1063 &self,
1064 id: &str,
1065 state: &str,
1066 note: Option<&str>,
1067 prompt_tokens: i64,
1068 completion_tokens: i64,
1069 accounted_at: &str,
1070 ) -> Result<()> {
1071 self.conn.execute(
1072 "UPDATE episodic_log
1073 SET distill_state=?, distill_note=?,
1074 distill_prompt_tokens=?, distill_completion_tokens=?,
1075 distill_accounted_at=?,
1076 distill_attempts=distill_attempts
1077 + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
1078 distill_last_failed_at=CASE
1079 WHEN ?='failed' THEN ?
1080 ELSE distill_last_failed_at
1081 END,
1082 distill_run_id=NULL, distill_locked_at=NULL
1083 WHERE id=?",
1084 params![
1085 state,
1086 note,
1087 prompt_tokens,
1088 completion_tokens,
1089 accounted_at,
1090 state,
1091 state,
1092 accounted_at,
1093 id
1094 ],
1095 )?;
1096 Ok(())
1097 }
1098
1099 pub fn claim_distill_batch(
1102 &self,
1103 run_id: &str,
1104 limit: usize,
1105 locked_at: &str,
1106 ) -> Result<Vec<Value>> {
1107 self.conn.execute(
1109 "UPDATE episodic_log
1110 SET distill_state='screening', distill_run_id=?, distill_locked_at=?
1111 WHERE id IN (
1112 SELECT id FROM episodic_log
1113 WHERE distill_state='new'
1114 ORDER BY priority DESC, ts ASC
1115 LIMIT ?
1116 )",
1117 params![run_id, locked_at, limit as i64],
1118 )?;
1119 self.query_json(
1120 "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
1121 params![run_id],
1122 )
1123 }
1124
1125 pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
1126 self.query_json(
1127 "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
1128 params![limit as i64],
1129 )
1130 }
1131
1132 pub fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
1137 self.query_json(sql, params![])
1138 }
1139
1140 pub fn query_chunks_params<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1141 self.query_json(sql, p)
1142 }
1143
1144 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<(String, String, Option<String>)>> {
1149 let mut stmt = self
1150 .conn
1151 .prepare("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
1152 let rows = stmt.query_map([chunk_id], |r| {
1153 Ok((
1154 r.get::<_, String>(0)?,
1155 r.get::<_, String>(1)?,
1156 r.get::<_, Option<String>>(2)?,
1157 ))
1158 })?;
1159 Ok(rows.filter_map(|r| r.ok()).collect())
1160 }
1161
1162 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
1163 let mut stmt = self.conn.prepare("SELECT src FROM deps WHERE dst=?")?;
1164 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
1165 Ok(rows.filter_map(|r| r.ok()).collect())
1166 }
1167
1168 pub fn insert_dep(
1169 &self,
1170 src: &str,
1171 dst: &str,
1172 kind: &str,
1173 dst_lib: Option<&str>,
1174 ) -> Result<()> {
1175 self.conn.execute(
1176 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
1177 params![src, dst, kind, dst_lib],
1178 )?;
1179 Ok(())
1180 }
1181
1182 pub fn upsert_chunk_success_trace(
1187 &self,
1188 chunk_id: &str,
1189 trace_id: &str,
1190 ts: &str,
1191 ) -> Result<()> {
1192 self.conn.execute(
1193 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
1194 params![chunk_id, trace_id, ts],
1195 )?;
1196 Ok(())
1197 }
1198
1199 pub fn attach_shared(&self, path: &str, alias: &str) -> Result<()> {
1204 self.conn.execute_batch(&format!(
1205 "ATTACH DATABASE '{}' AS '{alias}'",
1206 path.replace('\'', "''")
1207 ))?;
1208 Ok(())
1209 }
1210
1211 pub fn lib_id(&self) -> Result<String> {
1212 Ok(self
1213 .get_meta("lib_id")?
1214 .unwrap_or_else(|| "unknown".to_string()))
1215 }
1216
1217 fn query_json<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1222 let mut stmt = self.conn.prepare(sql)?;
1223 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
1224 let rows = stmt.query_map(p, |r| row_to_json_with_names(r, &names))?;
1225 Ok(rows.filter_map(|r| r.ok()).collect())
1226 }
1227
1228 pub fn execute(&self, sql: &str) -> Result<()> {
1229 self.conn.execute_batch(sql)?;
1230 Ok(())
1231 }
1232
1233 pub fn conn_execute<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<()> {
1235 self.conn.execute(sql, p)?;
1236 Ok(())
1237 }
1238}
1239
1240#[derive(Debug, Default, Clone)]
1245pub struct ChunkRow {
1246 pub id: String,
1247 pub skill_name: Option<String>,
1248 pub seq: i64,
1249 pub content: String,
1250 pub trigger_desc: Option<String>,
1251 pub anti_trigger_desc: Option<String>,
1252 pub content_hash: String,
1253 pub token_count: Option<i64>,
1254 pub origin: String,
1255 pub source: Option<String>,
1256 pub maturity: Option<String>,
1257 pub related_ids: Option<String>,
1258 pub protected: i64,
1259 pub state: String,
1260 pub state_reason: Option<String>,
1261 pub state_updated_at: Option<String>,
1262 pub confidence: f64,
1263 pub confidence_reason: Option<String>,
1264 pub version: i64,
1265 pub distilled_from: Option<String>,
1266 pub distill_provider: Option<String>,
1267 pub distill_model: Option<String>,
1268 pub distill_prompt_version: Option<String>,
1269 pub parent_id: Option<String>,
1270 pub selected_count: i64,
1271 pub used_count: i64,
1272 pub used_success_count: i64,
1273 pub success_trace_ids_count: i64,
1274 pub last_success_at: Option<String>,
1275 pub last_agg_ts: Option<String>,
1276 pub embed_version: i64,
1277 pub created_at: String,
1278 pub updated_at: String,
1279 pub last_used_at: Option<String>,
1280}
1281
1282#[derive(Debug, Default)]
1283pub struct EpisodicLogRow {
1284 pub id: String,
1285 pub trace_id: String,
1286 pub lib_id: String,
1287 pub ts: String,
1288 pub query: Option<String>,
1289 pub recall_snapshot: Option<String>,
1290 pub output: Option<String>,
1291 pub output_summary: Option<String>,
1292 pub outcome: Option<String>,
1293 pub event_source: String,
1294 pub task_state: String,
1295 pub completed_at: Option<String>,
1296 pub usage_state: String,
1297 pub used_ids: Option<String>,
1298 pub used_attribution: Option<String>,
1299 pub used_complete: bool,
1300 pub context_key: Option<String>,
1301 pub nomination: Option<String>,
1302 pub priority: i64,
1303 pub distill_state: String,
1304 pub distill_note: Option<String>,
1305}
1306
1307fn configure_pragmas(conn: &Connection) -> Result<()> {
1312 conn.execute_batch(
1313 "PRAGMA journal_mode=WAL;
1314 PRAGMA foreign_keys=ON;
1315 PRAGMA synchronous=NORMAL;
1316 PRAGMA cache_size=-65536;
1317 PRAGMA mmap_size=268435456;
1318 PRAGMA temp_store=memory;",
1319 )?;
1320 let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
1322 if mode != "wal" {
1323 return Err(crate::errors::InnateError::Other(format!(
1324 "WAL mode required but got '{mode}'; check filesystem support"
1325 )));
1326 }
1327 Ok(())
1328}
1329
1330fn ver_tuple(v: &str) -> (u32, u32, u32) {
1331 let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
1332 (
1333 parts.first().copied().unwrap_or(0),
1334 parts.get(1).copied().unwrap_or(0),
1335 parts.get(2).copied().unwrap_or(0),
1336 )
1337}
1338
1339fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
1341 let mut map = serde_json::Map::new();
1342 for (i, name) in names.iter().enumerate() {
1343 let v = row_value_at(row, i);
1344 map.insert(name.clone(), v);
1345 }
1346 Ok(Value::Object(map))
1347}
1348
1349fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
1350 let count = row.as_ref().column_count();
1351 let mut map = serde_json::Map::new();
1352 for i in 0..count {
1353 let name = row.as_ref().column_name(i)?.to_string();
1354 let v = row_value_at(row, i);
1355 map.insert(name, v);
1356 }
1357 Ok(Value::Object(map))
1358}
1359
1360fn row_value_at(row: &Row, i: usize) -> Value {
1361 if let Ok(v) = row.get::<_, Option<String>>(i) {
1363 return v.map(Value::String).unwrap_or(Value::Null);
1364 }
1365 if let Ok(v) = row.get::<_, Option<i64>>(i) {
1366 return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
1367 }
1368 if let Ok(v) = row.get::<_, Option<f64>>(i) {
1369 return v
1370 .and_then(serde_json::Number::from_f64)
1371 .map(Value::Number)
1372 .unwrap_or(Value::Null);
1373 }
1374 Value::Null
1375}
1376
1377trait OptionalExt<T> {
1378 fn optional(self) -> rusqlite::Result<Option<T>>;
1379}
1380impl<T> OptionalExt<T> for rusqlite::Result<T> {
1381 fn optional(self) -> rusqlite::Result<Option<T>> {
1382 match self {
1383 Ok(v) => Ok(Some(v)),
1384 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1385 Err(e) => Err(e),
1386 }
1387 }
1388}