1use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{cosine_similarity, unpack_embedding};
15
16const EXPECTED_SCHEMA_VERSION: &str = "4.14";
17
18const SCHEMA_SQL: &str = include_str!("schema.sql");
20
21type VectorEntries = Vec<(String, Vec<f32>)>;
22type VectorCache = RefCell<Option<VectorEntries>>;
23
24pub struct Storage {
25 pub db_path: PathBuf,
26 conn: Connection,
27 pub content_dim: usize,
28 pub trigger_dim: usize,
29 vec_content_cache: VectorCache,
31 vec_trigger_cache: VectorCache,
32 vector_cache_revision: Cell<Option<i64>>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct EvolveRequestClaim {
38 pub id: String,
39 pub reason: String,
40}
41
42impl Storage {
43 pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
44 let db_path = db_path.as_ref().to_path_buf();
45 if let Some(parent) = db_path.parent() {
46 std::fs::create_dir_all(parent)?;
47 }
48 let conn = Connection::open(&db_path)?;
49 configure_pragmas(&conn)?;
50 let mut s = Self {
51 db_path,
52 conn,
53 content_dim,
54 trigger_dim,
55 vec_content_cache: RefCell::new(None),
56 vec_trigger_cache: RefCell::new(None),
57 vector_cache_revision: Cell::new(None),
58 };
59 s.init_schema()?;
60 Ok(s)
61 }
62
63 pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
64 let db_path = db_path.as_ref().to_path_buf();
65 let conn = Connection::open_with_flags(
66 &db_path,
67 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
68 )?;
69 conn.pragma_update(None, "query_only", "ON")?;
70 conn.pragma_update(None, "foreign_keys", "ON")?;
71 let s = Self {
72 db_path,
73 conn,
74 content_dim: 1024,
75 trigger_dim: 256,
76 vec_content_cache: RefCell::new(None),
77 vec_trigger_cache: RefCell::new(None),
78 vector_cache_revision: Cell::new(None),
79 };
80 Ok(s)
81 }
82
83 fn init_schema(&mut self) -> Result<()> {
84 let has_meta: bool = self.conn.query_row(
85 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
86 [],
87 |r| r.get::<_, i64>(0),
88 )? > 0;
89
90 if !has_meta {
91 self.conn.execute_batch("BEGIN IMMEDIATE")?;
93 let r = self.conn.execute_batch(SCHEMA_SQL);
94 if r.is_ok() {
95 self.conn.execute_batch("COMMIT")?;
96 } else {
97 let _ = self.conn.execute_batch("ROLLBACK");
98 r?;
99 }
100 return Ok(());
101 }
102
103 let current: Option<String> = self
104 .conn
105 .query_row(
106 "SELECT value FROM meta WHERE key='schema_version'",
107 [],
108 |r| r.get(0),
109 )
110 .optional()?;
111
112 let current = current
113 .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
114
115 let cur = ver_tuple(¤t);
116 let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
117
118 match cur.cmp(&exp) {
119 std::cmp::Ordering::Equal => Ok(()),
120 std::cmp::Ordering::Greater => {
121 eprintln!(
123 "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
124 );
125 Ok(())
126 }
127 std::cmp::Ordering::Less => {
128 let applied = crate::migrate::run_migrations(&self.db_path)?;
130 if !applied.is_empty() {
131 eprintln!("[innate] auto-migrated: {}", applied.join(", "));
132 }
133 Ok(())
134 }
135 }
136 }
137
138 pub fn begin_immediate(&self) -> Result<()> {
143 self.conn.execute_batch("BEGIN IMMEDIATE")?;
144 Ok(())
145 }
146
147 pub fn commit(&self) -> Result<()> {
148 self.conn.execute_batch("COMMIT")?;
149 Ok(())
150 }
151
152 pub fn rollback(&self) -> Result<()> {
153 self.conn.execute_batch("ROLLBACK")?;
154 Ok(())
155 }
156
157 pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
162 Ok(self
163 .conn
164 .query_row("SELECT value FROM meta WHERE key=?", [key], |r| r.get(0))
165 .optional()?)
166 }
167
168 pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
169 self.conn.execute(
170 "INSERT OR REPLACE INTO meta(key, value) VALUES (?,?)",
171 params![key, value],
172 )?;
173 Ok(())
174 }
175
176 pub fn get_meta_or(&self, key: &str, default: &str) -> String {
177 self.get_meta(key)
178 .ok()
179 .flatten()
180 .unwrap_or_else(|| default.to_string())
181 }
182
183 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
188 self.conn.execute(
189 "INSERT INTO chunks (
190 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
191 content_hash, token_count, origin, source, maturity, related_ids,
192 protected, state, state_reason, state_updated_at,
193 confidence, confidence_base, confidence_reason, version, distilled_from,
194 distill_provider, distill_model, distill_prompt_version, parent_id,
195 selected_count, used_count, used_success_count,
196 success_trace_ids_count, last_success_at, last_agg_ts,
197 embed_version, created_at, updated_at, last_used_at
198 ) VALUES (
199 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
200 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
201 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
202 )",
203 params![
204 c.id,
205 c.skill_name,
206 c.seq,
207 c.content,
208 c.trigger_desc,
209 c.anti_trigger_desc,
210 c.content_hash,
211 c.token_count,
212 c.origin,
213 c.source,
214 c.maturity,
215 c.related_ids,
216 c.protected,
217 c.state,
218 c.state_reason,
219 c.state_updated_at,
220 c.confidence,
221 c.confidence,
222 c.confidence_reason,
223 c.version,
224 c.distilled_from,
225 c.distill_provider,
226 c.distill_model,
227 c.distill_prompt_version,
228 c.parent_id,
229 c.selected_count,
230 c.used_count,
231 c.used_success_count,
232 c.success_trace_ids_count,
233 c.last_success_at,
234 c.last_agg_ts,
235 c.embed_version,
236 c.created_at,
237 c.updated_at,
238 c.last_used_at
239 ],
240 )?;
241 Ok(())
242 }
243
244 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
245 self.conn.execute(
246 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
247 params![chunk_id, emb],
248 )?;
249 *self.vec_content_cache.borrow_mut() = None;
250 Ok(())
251 }
252
253 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
254 self.conn.execute(
255 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
256 params![chunk_id, emb],
257 )?;
258 self.conn.execute(
259 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
260 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
261 [],
262 )?;
263 *self.vec_trigger_cache.borrow_mut() = None;
264 Ok(())
265 }
266
267 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
268 let row = self
269 .conn
270 .query_row("SELECT * FROM chunks WHERE id=?", [id], row_to_json);
271 match row {
272 Ok(v) => Ok(Some(v)),
273 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
274 Err(e) => Err(e.into()),
275 }
276 }
277
278 pub fn update_chunk_state(
279 &self,
280 id: &str,
281 state: &str,
282 reason: Option<&str>,
283 now: &str,
284 ) -> Result<()> {
285 self.conn.execute(
286 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
287 params![state, reason, now, now, id],
288 )?;
289 Ok(())
290 }
291
292 pub fn update_chunk_confidence(
293 &self,
294 id: &str,
295 conf: f64,
296 reason: Option<&str>,
297 now: &str,
298 ) -> Result<()> {
299 self.conn.execute(
300 "UPDATE chunks
301 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
302 WHERE id=?",
303 params![conf, conf, reason, now, id],
304 )?;
305 Ok(())
306 }
307
308 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
309 self.conn.execute(
310 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
311 params![now, now, id],
312 )?;
313 Ok(())
314 }
315
316 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
317 let row = self.conn.query_row(
318 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
319 [hash],
320 row_to_json,
321 );
322 match row {
323 Ok(v) => Ok(Some(v)),
324 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
325 Err(e) => Err(e.into()),
326 }
327 }
328
329 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
334 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
335 }
336
337 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
338 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
339 }
340
341 fn search_vec(
342 &self,
343 cache_cell: &VectorCache,
344 table: &str,
345 query: &[f32],
346 limit: usize,
347 ) -> Result<Vec<(String, f32)>> {
348 if limit == 0 {
349 return Ok(Vec::new());
350 }
351 self.refresh_vector_caches_if_changed()?;
352
353 if cache_cell.borrow().is_none() {
355 let sql = format!("SELECT chunk_id, embedding FROM {table}");
356 let mut stmt = self.conn.prepare(&sql)?;
357 let entries: Vec<(String, Vec<f32>)> = stmt
358 .query_map([], |r| {
359 let id: String = r.get(0)?;
360 let blob: Vec<u8> = r.get(1)?;
361 Ok((id, blob))
362 })?
363 .filter_map(|r| r.ok())
364 .map(|(id, blob)| (id, unpack_embedding(&blob)))
365 .collect();
366 *cache_cell.borrow_mut() = Some(entries);
367 }
368
369 let cache = cache_cell.borrow();
370 let entries = cache.as_ref().unwrap();
371
372 let mut results: Vec<(String, f32)> = entries
374 .iter()
375 .map(|(id, v)| (id.clone(), cosine_similarity(query, v)))
376 .collect();
377 if results.len() > limit {
378 results.select_nth_unstable_by(limit - 1, |a, b| {
379 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
380 });
381 results.truncate(limit);
382 }
383 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
384 Ok(results)
385 }
386
387 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
388 let current = self
389 .get_meta("vector_revision")?
390 .and_then(|value| value.parse::<i64>().ok())
391 .unwrap_or(0);
392 let previous = self.vector_cache_revision.replace(Some(current));
393 if previous.is_some_and(|revision| revision != current) {
394 *self.vec_content_cache.borrow_mut() = None;
395 *self.vec_trigger_cache.borrow_mut() = None;
396 }
397 Ok(())
398 }
399
400 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
402 if ids.is_empty() {
403 return Ok(HashMap::new());
404 }
405 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
406 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
407 let mut stmt = self.conn.prepare(&sql)?;
408 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
409 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
410 row_to_json_with_names(r, &names)
411 })?;
412 let mut map = HashMap::with_capacity(ids.len());
413 for row in rows.filter_map(|r| r.ok()) {
414 if let Some(id) = row.get("id").and_then(Value::as_str) {
415 map.insert(id.to_string(), row);
416 }
417 }
418 Ok(map)
419 }
420
421 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
426 let count: i64 = self.conn.query_row(
427 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
428 [hash],
429 |r| r.get(0),
430 )?;
431 Ok(count > 0)
432 }
433
434 pub fn insert_invalidated_hash(
435 &self,
436 hash: &str,
437 reason: Option<&str>,
438 ts: &str,
439 ) -> Result<()> {
440 self.conn.execute(
441 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
442 params![hash, reason, ts],
443 )?;
444 Ok(())
445 }
446
447 #[allow(clippy::too_many_arguments)]
452 pub fn insert_usage_trace(
453 &self,
454 trace_id: &str,
455 chunk_id: Option<&str>,
456 event: &str,
457 strength: f64,
458 similarity: Option<f64>,
459 refine_mode: Option<&str>,
460 tokens: Option<i64>,
461 rank: Option<i64>,
462 attribution: Option<&str>,
463 source: &str,
464 ts: &str,
465 ) -> Result<usize> {
466 Ok(self.conn.execute(
467 "INSERT OR IGNORE INTO usage_trace
468 (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
469 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
470 params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
471 )?)
472 }
473
474 pub fn replace_used_trace(
475 &self,
476 trace_id: &str,
477 used_ids: &[String],
478 strength: f64,
479 attribution: &str,
480 source: &str,
481 ts: &str,
482 ) -> Result<()> {
483 self.conn.execute(
484 "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
485 [trace_id],
486 )?;
487 for chunk_id in used_ids {
488 self.insert_usage_trace(
489 trace_id,
490 Some(chunk_id),
491 "used",
492 strength,
493 None,
494 None,
495 None,
496 None,
497 Some(attribution),
498 source,
499 ts,
500 )?;
501 }
502 Ok(())
503 }
504
505 pub fn merge_used_trace(
506 &self,
507 trace_id: &str,
508 used_ids: &[String],
509 strength: f64,
510 attribution: &str,
511 source: &str,
512 ts: &str,
513 ) -> Result<()> {
514 let attribution_rank = |value: &str| match value {
515 "explicit" => 3,
516 "cited" => 2,
517 "inferred" => 1,
518 _ => 0,
519 };
520 for chunk_id in used_ids {
521 let existing = self
522 .query_chunks_params(
523 "SELECT attribution, strength FROM usage_trace
524 WHERE trace_id=? AND chunk_id=? AND event='used'",
525 params![trace_id, chunk_id],
526 )?
527 .into_iter()
528 .next();
529 if let Some(row) = existing {
530 let existing_attribution = row
531 .get("attribution")
532 .and_then(Value::as_str)
533 .unwrap_or("inferred");
534 if attribution_rank(attribution) > attribution_rank(existing_attribution) {
535 self.conn.execute(
536 "UPDATE usage_trace
537 SET strength=?, attribution=?, source=?, ts=?
538 WHERE trace_id=? AND chunk_id=? AND event='used'",
539 params![
540 strength,
541 attribution,
542 source,
543 ts,
544 trace_id,
545 chunk_id
546 ],
547 )?;
548 }
549 } else {
550 self.insert_usage_trace(
551 trace_id,
552 Some(chunk_id),
553 "used",
554 strength,
555 None,
556 None,
557 None,
558 None,
559 Some(attribution),
560 source,
561 ts,
562 )?;
563 }
564 }
565 Ok(())
566 }
567
568 pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
569 self.conn.execute(
570 "UPDATE chunks
571 SET last_used_at=COALESCE(
572 (SELECT MAX(ts) FROM usage_trace
573 WHERE chunk_id=? AND event='used'
574 AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
575 last_used_base
576 ),
577 updated_at=?
578 WHERE id=?",
579 params![chunk_id, now, chunk_id],
580 )?;
581 Ok(())
582 }
583
584 pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
585 let row = self.conn.query_row(
586 "SELECT event FROM usage_trace
587 WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
588 LIMIT 1",
589 [trace_id],
590 |r| r.get::<_, String>(0),
591 );
592 match row {
593 Ok(v) => Ok(Some(v)),
594 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
595 Err(e) => Err(e.into()),
596 }
597 }
598
599 pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
600 let n = self.conn.execute(
602 "DELETE FROM usage_trace
603 WHERE ts < ?
604 AND event IN ('retrieved','refined')
605 AND NOT (event = 'retrieved'
606 AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
607 [before_ts],
608 )?;
609 Ok(n)
610 }
611
612 pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
617 self.conn.execute(
618 "INSERT OR REPLACE INTO episodic_log
619 (id, trace_id, lib_id, ts, query, recall_snapshot, output,
620 output_summary, outcome, event_source, task_state, completed_at,
621 usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
622 distill_state, distill_note, distill_attempts, distill_last_failed_at)
623 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
624 params![
625 log.id,
626 log.trace_id,
627 log.lib_id,
628 log.ts,
629 log.query,
630 log.recall_snapshot,
631 log.output,
632 log.output_summary,
633 log.outcome,
634 log.event_source,
635 log.task_state,
636 log.completed_at,
637 log.usage_state,
638 log.used_ids,
639 log.used_attribution,
640 i64::from(log.used_complete),
641 log.context_key,
642 log.nomination,
643 log.priority,
644 log.distill_state,
645 log.distill_note
646 ],
647 )?;
648 Ok(())
649 }
650
651 pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
652 let row = self.conn.query_row(
653 "SELECT * FROM episodic_log WHERE trace_id=?",
654 [trace_id],
655 row_to_json,
656 );
657 match row {
658 Ok(v) => Ok(Some(v)),
659 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
660 Err(e) => Err(e.into()),
661 }
662 }
663
664 pub fn update_episodic_log_state(
665 &self,
666 trace_id: &str,
667 state: &str,
668 note: Option<&str>,
669 outcome: Option<&str>,
670 ) -> Result<()> {
671 self.conn.execute(
672 "UPDATE episodic_log
673 SET distill_state=?, distill_note=COALESCE(?,distill_note),
674 outcome=COALESCE(?,outcome),
675 distill_run_id=NULL, distill_locked_at=NULL
676 WHERE trace_id=?",
677 params![state, note, outcome, trace_id],
678 )?;
679 Ok(())
680 }
681
682 pub fn patch_episodic_log_content(
684 &self,
685 trace_id: &str,
686 query: Option<&str>,
687 output: Option<&str>,
688 output_summary: Option<&str>,
689 nomination: Option<&str>,
690 priority: i64,
691 ) -> Result<()> {
692 self.conn.execute(
693 "UPDATE episodic_log
694 SET output_summary = COALESCE(?, output_summary),
695 nomination = COALESCE(?, nomination),
696 output = COALESCE(?, output),
697 query = COALESCE(?, query),
698 priority = MAX(priority, ?)
699 WHERE trace_id = ?",
700 params![
701 output_summary,
702 nomination,
703 output,
704 query,
705 priority,
706 trace_id
707 ],
708 )?;
709 Ok(())
710 }
711
712 #[allow(clippy::too_many_arguments)]
713 pub fn update_trace_lifecycle(
714 &self,
715 trace_id: &str,
716 task_state: &str,
717 completed_at: Option<&str>,
718 usage_state: Option<&str>,
719 used_ids: Option<&str>,
720 used_attribution: Option<&str>,
721 used_complete: Option<bool>,
722 ) -> Result<()> {
723 self.conn.execute(
724 "UPDATE episodic_log
725 SET task_state=?,
726 completed_at=COALESCE(?, completed_at),
727 usage_state=COALESCE(?, usage_state),
728 used_ids=COALESCE(?, used_ids),
729 used_attribution=COALESCE(?, used_attribution),
730 used_complete=COALESCE(?, used_complete)
731 WHERE trace_id=?",
732 params![
733 task_state,
734 completed_at,
735 usage_state,
736 used_ids,
737 used_attribution,
738 used_complete.map(i64::from),
739 trace_id
740 ],
741 )?;
742 Ok(())
743 }
744
745 #[allow(clippy::too_many_arguments)]
746 pub fn upsert_confidence_evidence(
747 &self,
748 id: &str,
749 trace_id: Option<&str>,
750 chunk_id: &str,
751 kind: &str,
752 target: f64,
753 alpha: f64,
754 reason: &str,
755 context_key: Option<&str>,
756 ts: &str,
757 ) -> Result<()> {
758 self.conn.execute(
759 "INSERT INTO confidence_evidence
760 (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
761 VALUES (?,?,?,?,?,?,?,?,?)
762 ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
763 DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
764 reason=excluded.reason, context_key=excluded.context_key",
765 params![id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts],
766 )?;
767 Ok(())
768 }
769
770 pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
771 for kind in kinds {
772 self.conn.execute(
773 "DELETE FROM confidence_evidence WHERE trace_id=? AND kind=?",
774 params![trace_id, kind],
775 )?;
776 }
777 Ok(())
778 }
779
780 pub fn delete_chunk_trace_confidence_evidence(
781 &self,
782 trace_id: &str,
783 chunk_id: &str,
784 kind: &str,
785 ) -> Result<()> {
786 self.conn.execute(
787 "DELETE FROM confidence_evidence
788 WHERE trace_id=? AND chunk_id=? AND kind=?",
789 params![trace_id, chunk_id, kind],
790 )?;
791 Ok(())
792 }
793
794 pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
795 self.query_json(
796 "SELECT target, alpha, reason, ts, id
797 FROM confidence_evidence WHERE chunk_id=?
798 ORDER BY ts ASC,
799 CASE kind
800 WHEN 'outcome_ok' THEN 1
801 WHEN 'outcome_fail' THEN 1
802 WHEN 'selected_unused' THEN 2
803 WHEN 'feedback_up' THEN 3
804 WHEN 'feedback_down' THEN 3
805 WHEN 'decay' THEN 4
806 ELSE 5
807 END ASC,
808 kind ASC, id ASC",
809 [chunk_id],
810 )
811 }
812
813 #[allow(clippy::too_many_arguments)]
814 pub fn insert_feedback_event(
815 &self,
816 id: &str,
817 trace_id: &str,
818 chunk_id: &str,
819 signal: &str,
820 strength: f64,
821 source: &str,
822 actor: Option<&str>,
823 reason: Option<&str>,
824 context_key: Option<&str>,
825 ts: &str,
826 ) -> Result<usize> {
827 Ok(self.conn.execute(
828 "INSERT OR IGNORE INTO feedback_events
829 (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
830 VALUES (?,?,?,?,?,?,?,?,?,?)",
831 params![
832 id,
833 trace_id,
834 chunk_id,
835 signal,
836 strength,
837 source,
838 actor,
839 reason,
840 context_key,
841 ts
842 ],
843 )?)
844 }
845
846 pub fn delete_feedback_event(
847 &self,
848 trace_id: &str,
849 chunk_id: &str,
850 signal: &str,
851 ) -> Result<usize> {
852 Ok(self.conn.execute(
853 "DELETE FROM feedback_events
854 WHERE trace_id=? AND chunk_id=? AND signal=?",
855 params![trace_id, chunk_id, signal],
856 )?)
857 }
858
859 pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
860 self.conn.execute(
861 "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
862 params![now, now, id],
863 )?;
864 Ok(())
865 }
866
867 #[allow(clippy::too_many_arguments)]
868 pub fn update_context_stat(
869 &self,
870 chunk_id: &str,
871 context_key: &str,
872 success: i64,
873 failure: i64,
874 positive: i64,
875 negative: i64,
876 now: &str,
877 ) -> Result<()> {
878 self.conn.execute(
879 "INSERT INTO chunk_context_stats
880 (chunk_id, context_key, success_count, failure_count,
881 positive_feedback, negative_feedback, last_updated_at)
882 VALUES (?,?,?,?,?,?,?)
883 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
884 success_count=success_count+excluded.success_count,
885 failure_count=failure_count+excluded.failure_count,
886 positive_feedback=positive_feedback+excluded.positive_feedback,
887 negative_feedback=negative_feedback+excluded.negative_feedback,
888 last_updated_at=excluded.last_updated_at",
889 params![
890 chunk_id,
891 context_key,
892 success,
893 failure,
894 positive,
895 negative,
896 now
897 ],
898 )?;
899 Ok(())
900 }
901
902 pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
903 let row = self
904 .conn
905 .query_row(
906 "SELECT success_count, failure_count, positive_feedback, negative_feedback
907 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
908 params![chunk_id, context_key],
909 |row| {
910 Ok((
911 row.get::<_, i64>(0)?,
912 row.get::<_, i64>(1)?,
913 row.get::<_, i64>(2)?,
914 row.get::<_, i64>(3)?,
915 ))
916 },
917 )
918 .optional()?;
919 let Some((success, failure, positive, negative)) = row else {
920 return Ok(0.0);
921 };
922 let wins = success as f64 + positive as f64 * 2.0;
923 let losses = failure as f64 + negative as f64 * 2.0;
924 let evidence = wins + losses;
925 let posterior = (wins + 1.0) / (evidence + 2.0);
926 let evidence_weight = (evidence / 5.0).min(1.0);
927 Ok((posterior - 0.5) * 2.0 * evidence_weight)
928 }
929
930 #[allow(clippy::too_many_arguments)]
931 pub fn upsert_governance_proposal(
932 &self,
933 id: &str,
934 chunk_id: &str,
935 proposal_type: &str,
936 reason: &str,
937 evidence_count: i64,
938 evidence_score: f64,
939 actor_count: i64,
940 now: &str,
941 ) -> Result<()> {
942 self.conn.execute(
943 "INSERT INTO governance_proposals
944 (id, chunk_id, proposal_type, reason, evidence_count,
945 evidence_score, actor_count, state, created_at, updated_at)
946 VALUES (?,?,?,?,?,?,?,'pending',?,?)
947 ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
948 DO UPDATE SET reason=excluded.reason,
949 evidence_count=excluded.evidence_count,
950 evidence_score=excluded.evidence_score,
951 actor_count=excluded.actor_count,
952 updated_at=excluded.updated_at",
953 params![
954 id,
955 chunk_id,
956 proposal_type,
957 reason,
958 evidence_count,
959 evidence_score,
960 actor_count,
961 now,
962 now
963 ],
964 )?;
965 Ok(())
966 }
967
968 pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
969 self.request_evolve_at(id, reason, now, None)
970 }
971
972 pub fn request_evolve_at(
973 &self,
974 id: &str,
975 reason: &str,
976 now: &str,
977 next_retry_at: Option<&str>,
978 ) -> Result<()> {
979 let priority = match reason {
980 "governance_ready" => 100,
981 "governance" => 80,
982 "threshold" => 60,
983 "distill_retry" => 50,
984 "batch_continue" => 40,
985 _ => 20,
986 };
987 self.conn.execute(
988 "INSERT INTO evolve_requests(
989 id, reason, state, requested_at, priority, next_retry_at
990 )
991 VALUES (?,?,'pending',?,?,?)
992 ON CONFLICT(reason) WHERE state='pending'
993 DO UPDATE SET
994 priority=MAX(priority, excluded.priority),
995 next_retry_at=CASE
996 WHEN excluded.next_retry_at IS NULL THEN NULL
997 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
998 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
999 END",
1000 params![id, reason, now, priority, next_retry_at],
1001 )?;
1002 Ok(())
1003 }
1004
1005 pub fn claim_evolve_request(&self, now: &str, stale_before: &str) -> Result<Option<String>> {
1006 Ok(self
1007 .claim_evolve_request_with_reason(now, stale_before)?
1008 .map(|claim| claim.id))
1009 }
1010
1011 pub fn claim_evolve_request_with_reason(
1012 &self,
1013 now: &str,
1014 stale_before: &str,
1015 ) -> Result<Option<EvolveRequestClaim>> {
1016 self.conn.execute(
1017 "UPDATE evolve_requests
1018 SET state='pending', leased_at=NULL, note='lease_recovered'
1019 WHERE state='running' AND leased_at < ?",
1020 [stale_before],
1021 )?;
1022 self.conn.execute(
1023 "UPDATE evolve_requests
1024 SET state='pending', leased_at=NULL, note='retry_failed'
1025 WHERE state='failed' AND attempts < 3
1026 AND COALESCE(next_retry_at, completed_at) < ?",
1027 [now],
1028 )?;
1029 Ok(self.conn.query_row(
1030 "UPDATE evolve_requests
1031 SET state='running', leased_at=?, attempts=attempts+1
1032 WHERE id=(
1033 SELECT id FROM evolve_requests
1034 WHERE state='pending'
1035 AND (next_retry_at IS NULL OR next_retry_at <= ?)
1036 ORDER BY priority DESC, requested_at ASC LIMIT 1
1037 ) AND state='pending'
1038 RETURNING id, reason",
1039 params![now, now],
1040 |row| {
1041 Ok(EvolveRequestClaim {
1042 id: row.get(0)?,
1043 reason: row.get(1)?,
1044 })
1045 },
1046 ).optional()?)
1047 }
1048
1049 pub fn defer_evolve_request(
1050 &self,
1051 id: &str,
1052 note: &str,
1053 next_retry_at: &str,
1054 ) -> Result<()> {
1055 self.conn.execute(
1056 "DELETE FROM evolve_requests
1057 WHERE state='pending'
1058 AND id!=?
1059 AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
1060 params![id, id],
1061 )?;
1062 self.conn.execute(
1063 "UPDATE evolve_requests
1064 SET state='pending', leased_at=NULL, completed_at=NULL,
1065 note=?, next_retry_at=?
1066 WHERE id=?",
1067 params![note, next_retry_at, id],
1068 )?;
1069 Ok(())
1070 }
1071
1072 pub fn finish_evolve_request(
1073 &self,
1074 id: &str,
1075 state: &str,
1076 note: Option<&str>,
1077 now: &str,
1078 ) -> Result<()> {
1079 self.conn.execute(
1080 "UPDATE evolve_requests
1081 SET state=?, completed_at=?, note=?,
1082 last_failed_at=CASE
1083 WHEN ?='failed' THEN ?
1084 ELSE last_failed_at
1085 END,
1086 next_retry_at=CASE WHEN ?='failed'
1087 THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
1088 ELSE NULL END
1089 WHERE id=?",
1090 params![state, now, note, state, now, state, now, id],
1091 )?;
1092 Ok(())
1093 }
1094
1095 pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
1096 self.conn.execute(
1097 "UPDATE evolve_requests
1098 SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
1099 WHERE state='pending' AND requested_at <= ?",
1100 params![now, requested_before],
1101 )?;
1102 Ok(())
1103 }
1104
1105 pub fn update_episodic_log_state_by_id(
1107 &self,
1108 id: &str,
1109 state: &str,
1110 note: Option<&str>,
1111 outcome: Option<&str>,
1112 ) -> Result<()> {
1113 self.conn.execute(
1114 "UPDATE episodic_log
1115 SET distill_state=?, distill_note=COALESCE(?,distill_note),
1116 outcome=COALESCE(?,outcome),
1117 distill_run_id=NULL, distill_locked_at=NULL
1118 WHERE id=?",
1119 params![state, note, outcome, id],
1120 )?;
1121 Ok(())
1122 }
1123
1124 #[allow(clippy::too_many_arguments)]
1125 pub fn finish_distill_log(
1126 &self,
1127 id: &str,
1128 state: &str,
1129 note: Option<&str>,
1130 prompt_tokens: i64,
1131 completion_tokens: i64,
1132 accounted_at: &str,
1133 ) -> Result<()> {
1134 self.conn.execute(
1135 "INSERT INTO distill_token_usage(
1136 log_id, prompt_tokens, completion_tokens, outcome, accounted_at
1137 ) VALUES (?,?,?,?,?)",
1138 params![
1139 id,
1140 prompt_tokens,
1141 completion_tokens,
1142 state,
1143 accounted_at
1144 ],
1145 )?;
1146 self.conn.execute(
1147 "UPDATE episodic_log
1148 SET distill_state=?, distill_note=?,
1149 distill_prompt_tokens=?, distill_completion_tokens=?,
1150 distill_accounted_at=?,
1151 distill_attempts=distill_attempts
1152 + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
1153 distill_last_failed_at=CASE
1154 WHEN ?='failed' THEN ?
1155 ELSE distill_last_failed_at
1156 END,
1157 distill_run_id=NULL, distill_locked_at=NULL
1158 WHERE id=?",
1159 params![
1160 state,
1161 note,
1162 prompt_tokens,
1163 completion_tokens,
1164 accounted_at,
1165 state,
1166 state,
1167 accounted_at,
1168 id
1169 ],
1170 )?;
1171 Ok(())
1172 }
1173
1174 pub fn claim_distill_batch(
1177 &self,
1178 run_id: &str,
1179 limit: usize,
1180 locked_at: &str,
1181 ) -> Result<Vec<Value>> {
1182 self.conn.execute(
1184 "UPDATE episodic_log
1185 SET distill_state='screening', distill_run_id=?, distill_locked_at=?
1186 WHERE id IN (
1187 SELECT id FROM episodic_log
1188 WHERE distill_state='new'
1189 ORDER BY priority DESC, ts ASC
1190 LIMIT ?
1191 )",
1192 params![run_id, locked_at, limit as i64],
1193 )?;
1194 self.query_json(
1195 "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
1196 params![run_id],
1197 )
1198 }
1199
1200 pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
1201 self.query_json(
1202 "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
1203 params![limit as i64],
1204 )
1205 }
1206
1207 pub fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
1212 self.query_json(sql, params![])
1213 }
1214
1215 pub fn query_chunks_params<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1216 self.query_json(sql, p)
1217 }
1218
1219 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<(String, String, Option<String>)>> {
1224 let mut stmt = self
1225 .conn
1226 .prepare("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
1227 let rows = stmt.query_map([chunk_id], |r| {
1228 Ok((
1229 r.get::<_, String>(0)?,
1230 r.get::<_, String>(1)?,
1231 r.get::<_, Option<String>>(2)?,
1232 ))
1233 })?;
1234 Ok(rows.filter_map(|r| r.ok()).collect())
1235 }
1236
1237 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
1238 let mut stmt = self.conn.prepare("SELECT src FROM deps WHERE dst=?")?;
1239 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
1240 Ok(rows.filter_map(|r| r.ok()).collect())
1241 }
1242
1243 pub fn insert_dep(
1244 &self,
1245 src: &str,
1246 dst: &str,
1247 kind: &str,
1248 dst_lib: Option<&str>,
1249 ) -> Result<()> {
1250 self.conn.execute(
1251 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
1252 params![src, dst, kind, dst_lib],
1253 )?;
1254 Ok(())
1255 }
1256
1257 pub fn upsert_chunk_success_trace(
1262 &self,
1263 chunk_id: &str,
1264 trace_id: &str,
1265 ts: &str,
1266 ) -> Result<()> {
1267 self.conn.execute(
1268 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
1269 params![chunk_id, trace_id, ts],
1270 )?;
1271 Ok(())
1272 }
1273
1274 pub fn attach_shared(&self, path: &str, alias: &str) -> Result<()> {
1279 self.conn.execute_batch(&format!(
1280 "ATTACH DATABASE '{}' AS '{alias}'",
1281 path.replace('\'', "''")
1282 ))?;
1283 Ok(())
1284 }
1285
1286 pub fn lib_id(&self) -> Result<String> {
1287 Ok(self
1288 .get_meta("lib_id")?
1289 .unwrap_or_else(|| "unknown".to_string()))
1290 }
1291
1292 fn query_json<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1297 let mut stmt = self.conn.prepare(sql)?;
1298 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
1299 let rows = stmt.query_map(p, |r| row_to_json_with_names(r, &names))?;
1300 Ok(rows.filter_map(|r| r.ok()).collect())
1301 }
1302
1303 pub fn execute(&self, sql: &str) -> Result<()> {
1304 self.conn.execute_batch(sql)?;
1305 Ok(())
1306 }
1307
1308 pub fn conn_execute<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<()> {
1310 self.conn.execute(sql, p)?;
1311 Ok(())
1312 }
1313
1314 pub fn conn_execute_count<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<usize> {
1315 Ok(self.conn.execute(sql, p)?)
1316 }
1317}
1318
1319#[derive(Debug, Default, Clone)]
1324pub struct ChunkRow {
1325 pub id: String,
1326 pub skill_name: Option<String>,
1327 pub seq: i64,
1328 pub content: String,
1329 pub trigger_desc: Option<String>,
1330 pub anti_trigger_desc: Option<String>,
1331 pub content_hash: String,
1332 pub token_count: Option<i64>,
1333 pub origin: String,
1334 pub source: Option<String>,
1335 pub maturity: Option<String>,
1336 pub related_ids: Option<String>,
1337 pub protected: i64,
1338 pub state: String,
1339 pub state_reason: Option<String>,
1340 pub state_updated_at: Option<String>,
1341 pub confidence: f64,
1342 pub confidence_reason: Option<String>,
1343 pub version: i64,
1344 pub distilled_from: Option<String>,
1345 pub distill_provider: Option<String>,
1346 pub distill_model: Option<String>,
1347 pub distill_prompt_version: Option<String>,
1348 pub parent_id: Option<String>,
1349 pub selected_count: i64,
1350 pub used_count: i64,
1351 pub used_success_count: i64,
1352 pub success_trace_ids_count: i64,
1353 pub last_success_at: Option<String>,
1354 pub last_agg_ts: Option<String>,
1355 pub embed_version: i64,
1356 pub created_at: String,
1357 pub updated_at: String,
1358 pub last_used_at: Option<String>,
1359}
1360
1361#[derive(Debug, Default)]
1362pub struct EpisodicLogRow {
1363 pub id: String,
1364 pub trace_id: String,
1365 pub lib_id: String,
1366 pub ts: String,
1367 pub query: Option<String>,
1368 pub recall_snapshot: Option<String>,
1369 pub output: Option<String>,
1370 pub output_summary: Option<String>,
1371 pub outcome: Option<String>,
1372 pub event_source: String,
1373 pub task_state: String,
1374 pub completed_at: Option<String>,
1375 pub usage_state: String,
1376 pub used_ids: Option<String>,
1377 pub used_attribution: Option<String>,
1378 pub used_complete: bool,
1379 pub context_key: Option<String>,
1380 pub nomination: Option<String>,
1381 pub priority: i64,
1382 pub distill_state: String,
1383 pub distill_note: Option<String>,
1384}
1385
1386fn configure_pragmas(conn: &Connection) -> Result<()> {
1391 conn.execute_batch(
1392 "PRAGMA journal_mode=WAL;
1393 PRAGMA foreign_keys=ON;
1394 PRAGMA synchronous=NORMAL;
1395 PRAGMA cache_size=-65536;
1396 PRAGMA mmap_size=268435456;
1397 PRAGMA temp_store=memory;",
1398 )?;
1399 let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
1401 if mode != "wal" {
1402 return Err(crate::errors::InnateError::Other(format!(
1403 "WAL mode required but got '{mode}'; check filesystem support"
1404 )));
1405 }
1406 Ok(())
1407}
1408
1409fn ver_tuple(v: &str) -> (u32, u32, u32) {
1410 let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
1411 (
1412 parts.first().copied().unwrap_or(0),
1413 parts.get(1).copied().unwrap_or(0),
1414 parts.get(2).copied().unwrap_or(0),
1415 )
1416}
1417
1418fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
1420 let mut map = serde_json::Map::new();
1421 for (i, name) in names.iter().enumerate() {
1422 let v = row_value_at(row, i);
1423 map.insert(name.clone(), v);
1424 }
1425 Ok(Value::Object(map))
1426}
1427
1428fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
1429 let count = row.as_ref().column_count();
1430 let mut map = serde_json::Map::new();
1431 for i in 0..count {
1432 let name = row.as_ref().column_name(i)?.to_string();
1433 let v = row_value_at(row, i);
1434 map.insert(name, v);
1435 }
1436 Ok(Value::Object(map))
1437}
1438
1439fn row_value_at(row: &Row, i: usize) -> Value {
1440 if let Ok(v) = row.get::<_, Option<String>>(i) {
1442 return v.map(Value::String).unwrap_or(Value::Null);
1443 }
1444 if let Ok(v) = row.get::<_, Option<i64>>(i) {
1445 return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
1446 }
1447 if let Ok(v) = row.get::<_, Option<f64>>(i) {
1448 return v
1449 .and_then(serde_json::Number::from_f64)
1450 .map(Value::Number)
1451 .unwrap_or(Value::Null);
1452 }
1453 Value::Null
1454}
1455
1456trait OptionalExt<T> {
1457 fn optional(self) -> rusqlite::Result<Option<T>>;
1458}
1459impl<T> OptionalExt<T> for rusqlite::Result<T> {
1460 fn optional(self) -> rusqlite::Result<Option<T>> {
1461 match self {
1462 Ok(v) => Ok(Some(v)),
1463 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1464 Err(e) => Err(e),
1465 }
1466 }
1467}