Skip to main content

innate_core/
storage.rs

1//! SQLite storage layer.
2//!
3//! Replaces sqlite-vec virtual tables with ordinary BLOB columns + pure-Rust
4//! cosine similarity, keeping the schema otherwise aligned with v4.5.x.
5
6use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{cosine_similarity, unpack_embedding};
15
16const EXPECTED_SCHEMA_VERSION: &str = "4.13";
17
18// Embedded SQL schema — no external files needed.
19const SCHEMA_SQL: &str = include_str!("schema.sql");
20
21type VectorEntries = Vec<(String, Vec<f32>)>;
22type VectorCache = RefCell<Option<VectorEntries>>;
23
24pub struct Storage {
25    pub db_path: PathBuf,
26    conn: Connection,
27    pub content_dim: usize,
28    pub trigger_dim: usize,
29    /// Pre-parsed in-memory caches for vector search; None = cold (not loaded or invalidated).
30    vec_content_cache: VectorCache,
31    vec_trigger_cache: VectorCache,
32    /// Last observed vector revision. Only vector writes advance this value.
33    vector_cache_revision: Cell<Option<i64>>,
34}
35
36impl Storage {
37    pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
38        let db_path = db_path.as_ref().to_path_buf();
39        if let Some(parent) = db_path.parent() {
40            std::fs::create_dir_all(parent)?;
41        }
42        let conn = Connection::open(&db_path)?;
43        configure_pragmas(&conn)?;
44        let mut s = Self {
45            db_path,
46            conn,
47            content_dim,
48            trigger_dim,
49            vec_content_cache: RefCell::new(None),
50            vec_trigger_cache: RefCell::new(None),
51            vector_cache_revision: Cell::new(None),
52        };
53        s.init_schema()?;
54        Ok(s)
55    }
56
57    pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
58        let db_path = db_path.as_ref().to_path_buf();
59        let conn = Connection::open_with_flags(
60            &db_path,
61            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
62        )?;
63        conn.pragma_update(None, "query_only", "ON")?;
64        conn.pragma_update(None, "foreign_keys", "ON")?;
65        let s = Self {
66            db_path,
67            conn,
68            content_dim: 1024,
69            trigger_dim: 256,
70            vec_content_cache: RefCell::new(None),
71            vec_trigger_cache: RefCell::new(None),
72            vector_cache_revision: Cell::new(None),
73        };
74        Ok(s)
75    }
76
77    fn init_schema(&mut self) -> Result<()> {
78        let has_meta: bool = self.conn.query_row(
79            "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
80            [],
81            |r| r.get::<_, i64>(0),
82        )? > 0;
83
84        if !has_meta {
85            // Wrap schema creation in a transaction for atomicity.
86            self.conn.execute_batch("BEGIN IMMEDIATE")?;
87            let r = self.conn.execute_batch(SCHEMA_SQL);
88            if r.is_ok() {
89                self.conn.execute_batch("COMMIT")?;
90            } else {
91                let _ = self.conn.execute_batch("ROLLBACK");
92                r?;
93            }
94            return Ok(());
95        }
96
97        let current: Option<String> = self
98            .conn
99            .query_row(
100                "SELECT value FROM meta WHERE key='schema_version'",
101                [],
102                |r| r.get(0),
103            )
104            .optional()?;
105
106        let current = current
107            .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
108
109        let cur = ver_tuple(&current);
110        let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
111
112        match cur.cmp(&exp) {
113            std::cmp::Ordering::Equal => Ok(()),
114            std::cmp::Ordering::Greater => {
115                // Forward-compat: newer schema, warn but allow.
116                eprintln!(
117                    "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
118                );
119                Ok(())
120            }
121            std::cmp::Ordering::Less => {
122                // Delegate to the proper migration chain which handles all steps atomically.
123                let applied = crate::migrate::run_migrations(&self.db_path)?;
124                if !applied.is_empty() {
125                    eprintln!("[innate] auto-migrated: {}", applied.join(", "));
126                }
127                Ok(())
128            }
129        }
130    }
131
132    // ------------------------------------------------------------------
133    // Transactions
134    // ------------------------------------------------------------------
135
136    pub fn begin_immediate(&self) -> Result<()> {
137        self.conn.execute_batch("BEGIN IMMEDIATE")?;
138        Ok(())
139    }
140
141    pub fn commit(&self) -> Result<()> {
142        self.conn.execute_batch("COMMIT")?;
143        Ok(())
144    }
145
146    pub fn rollback(&self) -> Result<()> {
147        self.conn.execute_batch("ROLLBACK")?;
148        Ok(())
149    }
150
151    // ------------------------------------------------------------------
152    // Meta
153    // ------------------------------------------------------------------
154
155    pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
156        Ok(self
157            .conn
158            .query_row("SELECT value FROM meta WHERE key=?", [key], |r| r.get(0))
159            .optional()?)
160    }
161
162    pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
163        self.conn.execute(
164            "INSERT OR REPLACE INTO meta(key, value) VALUES (?,?)",
165            params![key, value],
166        )?;
167        Ok(())
168    }
169
170    pub fn get_meta_or(&self, key: &str, default: &str) -> String {
171        self.get_meta(key)
172            .ok()
173            .flatten()
174            .unwrap_or_else(|| default.to_string())
175    }
176
177    // ------------------------------------------------------------------
178    // Chunk CRUD
179    // ------------------------------------------------------------------
180
181    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
182        self.conn.execute(
183            "INSERT INTO chunks (
184                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
185                content_hash, token_count, origin, source, maturity, related_ids,
186                protected, state, state_reason, state_updated_at,
187                confidence, confidence_base, confidence_reason, version, distilled_from,
188                distill_provider, distill_model, distill_prompt_version, parent_id,
189                selected_count, used_count, used_success_count,
190                success_trace_ids_count, last_success_at, last_agg_ts,
191                embed_version, created_at, updated_at, last_used_at
192            ) VALUES (
193                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
194                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
195                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
196            )",
197            params![
198                c.id,
199                c.skill_name,
200                c.seq,
201                c.content,
202                c.trigger_desc,
203                c.anti_trigger_desc,
204                c.content_hash,
205                c.token_count,
206                c.origin,
207                c.source,
208                c.maturity,
209                c.related_ids,
210                c.protected,
211                c.state,
212                c.state_reason,
213                c.state_updated_at,
214                c.confidence,
215                c.confidence,
216                c.confidence_reason,
217                c.version,
218                c.distilled_from,
219                c.distill_provider,
220                c.distill_model,
221                c.distill_prompt_version,
222                c.parent_id,
223                c.selected_count,
224                c.used_count,
225                c.used_success_count,
226                c.success_trace_ids_count,
227                c.last_success_at,
228                c.last_agg_ts,
229                c.embed_version,
230                c.created_at,
231                c.updated_at,
232                c.last_used_at
233            ],
234        )?;
235        Ok(())
236    }
237
238    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
239        self.conn.execute(
240            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
241            params![chunk_id, emb],
242        )?;
243        *self.vec_content_cache.borrow_mut() = None;
244        Ok(())
245    }
246
247    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
248        self.conn.execute(
249            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
250            params![chunk_id, emb],
251        )?;
252        self.conn.execute(
253            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
254             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
255            [],
256        )?;
257        *self.vec_trigger_cache.borrow_mut() = None;
258        Ok(())
259    }
260
261    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
262        let row = self
263            .conn
264            .query_row("SELECT * FROM chunks WHERE id=?", [id], row_to_json);
265        match row {
266            Ok(v) => Ok(Some(v)),
267            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
268            Err(e) => Err(e.into()),
269        }
270    }
271
272    pub fn update_chunk_state(
273        &self,
274        id: &str,
275        state: &str,
276        reason: Option<&str>,
277        now: &str,
278    ) -> Result<()> {
279        self.conn.execute(
280            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
281            params![state, reason, now, now, id],
282        )?;
283        Ok(())
284    }
285
286    pub fn update_chunk_confidence(
287        &self,
288        id: &str,
289        conf: f64,
290        reason: Option<&str>,
291        now: &str,
292    ) -> Result<()> {
293        self.conn.execute(
294            "UPDATE chunks
295             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
296             WHERE id=?",
297            params![conf, conf, reason, now, id],
298        )?;
299        Ok(())
300    }
301
302    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
303        self.conn.execute(
304            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
305            params![now, now, id],
306        )?;
307        Ok(())
308    }
309
310    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
311        let row = self.conn.query_row(
312            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
313            [hash],
314            row_to_json,
315        );
316        match row {
317            Ok(v) => Ok(Some(v)),
318            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
319            Err(e) => Err(e.into()),
320        }
321    }
322
323    // ------------------------------------------------------------------
324    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
325    // ------------------------------------------------------------------
326
327    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
328        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
329    }
330
331    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
332        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
333    }
334
335    fn search_vec(
336        &self,
337        cache_cell: &VectorCache,
338        table: &str,
339        query: &[f32],
340        limit: usize,
341    ) -> Result<Vec<(String, f32)>> {
342        if limit == 0 {
343            return Ok(Vec::new());
344        }
345        self.refresh_vector_caches_if_changed()?;
346
347        // Populate cache on first access after open or invalidation.
348        if cache_cell.borrow().is_none() {
349            let sql = format!("SELECT chunk_id, embedding FROM {table}");
350            let mut stmt = self.conn.prepare(&sql)?;
351            let entries: Vec<(String, Vec<f32>)> = stmt
352                .query_map([], |r| {
353                    let id: String = r.get(0)?;
354                    let blob: Vec<u8> = r.get(1)?;
355                    Ok((id, blob))
356                })?
357                .filter_map(|r| r.ok())
358                .map(|(id, blob)| (id, unpack_embedding(&blob)))
359                .collect();
360            *cache_cell.borrow_mut() = Some(entries);
361        }
362
363        let cache = cache_cell.borrow();
364        let entries = cache.as_ref().unwrap();
365
366        // Compute similarities, then partial-sort to bring top-limit to the front (O(N log K)).
367        let mut results: Vec<(String, f32)> = entries
368            .iter()
369            .map(|(id, v)| (id.clone(), cosine_similarity(query, v)))
370            .collect();
371        if results.len() > limit {
372            results.select_nth_unstable_by(limit - 1, |a, b| {
373                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
374            });
375            results.truncate(limit);
376        }
377        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
378        Ok(results)
379    }
380
381    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
382        let current = self
383            .get_meta("vector_revision")?
384            .and_then(|value| value.parse::<i64>().ok())
385            .unwrap_or(0);
386        let previous = self.vector_cache_revision.replace(Some(current));
387        if previous.is_some_and(|revision| revision != current) {
388            *self.vec_content_cache.borrow_mut() = None;
389            *self.vec_trigger_cache.borrow_mut() = None;
390        }
391        Ok(())
392    }
393
394    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
395    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
396        if ids.is_empty() {
397            return Ok(HashMap::new());
398        }
399        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
400        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
401        let mut stmt = self.conn.prepare(&sql)?;
402        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
403        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
404            row_to_json_with_names(r, &names)
405        })?;
406        let mut map = HashMap::with_capacity(ids.len());
407        for row in rows.filter_map(|r| r.ok()) {
408            if let Some(id) = row.get("id").and_then(Value::as_str) {
409                map.insert(id.to_string(), row);
410            }
411        }
412        Ok(map)
413    }
414
415    // ------------------------------------------------------------------
416    // Invalidated hashes
417    // ------------------------------------------------------------------
418
419    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
420        let count: i64 = self.conn.query_row(
421            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
422            [hash],
423            |r| r.get(0),
424        )?;
425        Ok(count > 0)
426    }
427
428    pub fn insert_invalidated_hash(
429        &self,
430        hash: &str,
431        reason: Option<&str>,
432        ts: &str,
433    ) -> Result<()> {
434        self.conn.execute(
435            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
436            params![hash, reason, ts],
437        )?;
438        Ok(())
439    }
440
441    // ------------------------------------------------------------------
442    // Usage trace
443    // ------------------------------------------------------------------
444
445    #[allow(clippy::too_many_arguments)]
446    pub fn insert_usage_trace(
447        &self,
448        trace_id: &str,
449        chunk_id: Option<&str>,
450        event: &str,
451        strength: f64,
452        similarity: Option<f64>,
453        refine_mode: Option<&str>,
454        tokens: Option<i64>,
455        rank: Option<i64>,
456        attribution: Option<&str>,
457        source: &str,
458        ts: &str,
459    ) -> Result<usize> {
460        Ok(self.conn.execute(
461            "INSERT OR IGNORE INTO usage_trace
462             (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
463             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
464            params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
465        )?)
466    }
467
468    pub fn replace_used_trace(
469        &self,
470        trace_id: &str,
471        used_ids: &[String],
472        strength: f64,
473        attribution: &str,
474        source: &str,
475        ts: &str,
476    ) -> Result<()> {
477        self.conn.execute(
478            "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
479            [trace_id],
480        )?;
481        for chunk_id in used_ids {
482            self.insert_usage_trace(
483                trace_id,
484                Some(chunk_id),
485                "used",
486                strength,
487                None,
488                None,
489                None,
490                None,
491                Some(attribution),
492                source,
493                ts,
494            )?;
495        }
496        Ok(())
497    }
498
499    pub fn merge_used_trace(
500        &self,
501        trace_id: &str,
502        used_ids: &[String],
503        strength: f64,
504        attribution: &str,
505        source: &str,
506        ts: &str,
507    ) -> Result<()> {
508        let attribution_rank = |value: &str| match value {
509            "explicit" => 3,
510            "cited" => 2,
511            "inferred" => 1,
512            _ => 0,
513        };
514        for chunk_id in used_ids {
515            let existing = self
516                .query_chunks_params(
517                    "SELECT attribution, strength FROM usage_trace
518                     WHERE trace_id=? AND chunk_id=? AND event='used'",
519                    params![trace_id, chunk_id],
520                )?
521                .into_iter()
522                .next();
523            if let Some(row) = existing {
524                let existing_attribution = row
525                    .get("attribution")
526                    .and_then(Value::as_str)
527                    .unwrap_or("inferred");
528                if attribution_rank(attribution) > attribution_rank(existing_attribution) {
529                    self.conn.execute(
530                        "UPDATE usage_trace
531                         SET strength=?, attribution=?, source=?, ts=?
532                         WHERE trace_id=? AND chunk_id=? AND event='used'",
533                        params![
534                            strength,
535                            attribution,
536                            source,
537                            ts,
538                            trace_id,
539                            chunk_id
540                        ],
541                    )?;
542                }
543            } else {
544                self.insert_usage_trace(
545                    trace_id,
546                    Some(chunk_id),
547                    "used",
548                    strength,
549                    None,
550                    None,
551                    None,
552                    None,
553                    Some(attribution),
554                    source,
555                    ts,
556                )?;
557            }
558        }
559        Ok(())
560    }
561
562    pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
563        self.conn.execute(
564            "UPDATE chunks
565             SET last_used_at=COALESCE(
566                   (SELECT MAX(ts) FROM usage_trace
567                    WHERE chunk_id=? AND event='used'),
568                   last_used_base
569                 ),
570                 updated_at=?
571             WHERE id=?",
572            params![chunk_id, now, chunk_id],
573        )?;
574        Ok(())
575    }
576
577    pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
578        let row = self.conn.query_row(
579            "SELECT event FROM usage_trace
580             WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
581             LIMIT 1",
582            [trace_id],
583            |r| r.get::<_, String>(0),
584        );
585        match row {
586            Ok(v) => Ok(Some(v)),
587            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
588            Err(e) => Err(e.into()),
589        }
590    }
591
592    pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
593        // Preserve compact attribution facts. They are required to replay corrections.
594        let n = self.conn.execute(
595            "DELETE FROM usage_trace
596             WHERE ts < ?
597             AND event IN ('retrieved','refined')
598             AND NOT (event = 'retrieved'
599                      AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
600            [before_ts],
601        )?;
602        Ok(n)
603    }
604
605    // ------------------------------------------------------------------
606    // Episodic log
607    // ------------------------------------------------------------------
608
609    pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
610        self.conn.execute(
611            "INSERT OR REPLACE INTO episodic_log
612             (id, trace_id, lib_id, ts, query, recall_snapshot, output,
613              output_summary, outcome, event_source, task_state, completed_at,
614              usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
615              distill_state, distill_note, distill_attempts, distill_last_failed_at)
616             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
617            params![
618                log.id,
619                log.trace_id,
620                log.lib_id,
621                log.ts,
622                log.query,
623                log.recall_snapshot,
624                log.output,
625                log.output_summary,
626                log.outcome,
627                log.event_source,
628                log.task_state,
629                log.completed_at,
630                log.usage_state,
631                log.used_ids,
632                log.used_attribution,
633                i64::from(log.used_complete),
634                log.context_key,
635                log.nomination,
636                log.priority,
637                log.distill_state,
638                log.distill_note
639            ],
640        )?;
641        Ok(())
642    }
643
644    pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
645        let row = self.conn.query_row(
646            "SELECT * FROM episodic_log WHERE trace_id=?",
647            [trace_id],
648            row_to_json,
649        );
650        match row {
651            Ok(v) => Ok(Some(v)),
652            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
653            Err(e) => Err(e.into()),
654        }
655    }
656
657    pub fn update_episodic_log_state(
658        &self,
659        trace_id: &str,
660        state: &str,
661        note: Option<&str>,
662        outcome: Option<&str>,
663    ) -> Result<()> {
664        self.conn.execute(
665            "UPDATE episodic_log
666             SET distill_state=?, distill_note=COALESCE(?,distill_note),
667                 outcome=COALESCE(?,outcome),
668                 distill_run_id=NULL, distill_locked_at=NULL
669             WHERE trace_id=?",
670            params![state, note, outcome, trace_id],
671        )?;
672        Ok(())
673    }
674
675    /// Patch content fields on an existing episodic_log row (補写: output_summary, nomination, etc.)
676    pub fn patch_episodic_log_content(
677        &self,
678        trace_id: &str,
679        query: Option<&str>,
680        output: Option<&str>,
681        output_summary: Option<&str>,
682        nomination: Option<&str>,
683        priority: i64,
684    ) -> Result<()> {
685        self.conn.execute(
686            "UPDATE episodic_log
687             SET output_summary = COALESCE(?, output_summary),
688                 nomination     = COALESCE(?, nomination),
689                 output         = COALESCE(?, output),
690                 query          = COALESCE(?, query),
691                 priority       = MAX(priority, ?)
692             WHERE trace_id = ?",
693            params![
694                output_summary,
695                nomination,
696                output,
697                query,
698                priority,
699                trace_id
700            ],
701        )?;
702        Ok(())
703    }
704
705    #[allow(clippy::too_many_arguments)]
706    pub fn update_trace_lifecycle(
707        &self,
708        trace_id: &str,
709        task_state: &str,
710        completed_at: Option<&str>,
711        usage_state: Option<&str>,
712        used_ids: Option<&str>,
713        used_attribution: Option<&str>,
714        used_complete: Option<bool>,
715    ) -> Result<()> {
716        self.conn.execute(
717            "UPDATE episodic_log
718             SET task_state=?,
719                 completed_at=COALESCE(?, completed_at),
720                 usage_state=COALESCE(?, usage_state),
721                 used_ids=COALESCE(?, used_ids),
722                 used_attribution=COALESCE(?, used_attribution),
723                 used_complete=COALESCE(?, used_complete)
724             WHERE trace_id=?",
725            params![
726                task_state,
727                completed_at,
728                usage_state,
729                used_ids,
730                used_attribution,
731                used_complete.map(i64::from),
732                trace_id
733            ],
734        )?;
735        Ok(())
736    }
737
738    #[allow(clippy::too_many_arguments)]
739    pub fn upsert_confidence_evidence(
740        &self,
741        id: &str,
742        trace_id: Option<&str>,
743        chunk_id: &str,
744        kind: &str,
745        target: f64,
746        alpha: f64,
747        reason: &str,
748        context_key: Option<&str>,
749        ts: &str,
750    ) -> Result<()> {
751        self.conn.execute(
752            "INSERT INTO confidence_evidence
753             (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
754             VALUES (?,?,?,?,?,?,?,?,?)
755             ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
756             DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
757                           reason=excluded.reason, context_key=excluded.context_key",
758            params![id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts],
759        )?;
760        Ok(())
761    }
762
763    pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
764        for kind in kinds {
765            self.conn.execute(
766                "DELETE FROM confidence_evidence WHERE trace_id=? AND kind=?",
767                params![trace_id, kind],
768            )?;
769        }
770        Ok(())
771    }
772
773    pub fn delete_chunk_trace_confidence_evidence(
774        &self,
775        trace_id: &str,
776        chunk_id: &str,
777        kind: &str,
778    ) -> Result<()> {
779        self.conn.execute(
780            "DELETE FROM confidence_evidence
781             WHERE trace_id=? AND chunk_id=? AND kind=?",
782            params![trace_id, chunk_id, kind],
783        )?;
784        Ok(())
785    }
786
787    pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
788        self.query_json(
789            "SELECT target, alpha, reason, ts, id
790             FROM confidence_evidence WHERE chunk_id=?
791             ORDER BY ts ASC,
792                      CASE kind
793                        WHEN 'outcome_ok' THEN 1
794                        WHEN 'outcome_fail' THEN 1
795                        WHEN 'selected_unused' THEN 2
796                        WHEN 'feedback_up' THEN 3
797                        WHEN 'feedback_down' THEN 3
798                        WHEN 'decay' THEN 4
799                        ELSE 5
800                      END ASC,
801                      kind ASC, id ASC",
802            [chunk_id],
803        )
804    }
805
806    #[allow(clippy::too_many_arguments)]
807    pub fn insert_feedback_event(
808        &self,
809        id: &str,
810        trace_id: &str,
811        chunk_id: &str,
812        signal: &str,
813        strength: f64,
814        source: &str,
815        actor: Option<&str>,
816        reason: Option<&str>,
817        context_key: Option<&str>,
818        ts: &str,
819    ) -> Result<usize> {
820        Ok(self.conn.execute(
821            "INSERT OR IGNORE INTO feedback_events
822             (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
823             VALUES (?,?,?,?,?,?,?,?,?,?)",
824            params![
825                id,
826                trace_id,
827                chunk_id,
828                signal,
829                strength,
830                source,
831                actor,
832                reason,
833                context_key,
834                ts
835            ],
836        )?)
837    }
838
839    pub fn delete_feedback_event(
840        &self,
841        trace_id: &str,
842        chunk_id: &str,
843        signal: &str,
844    ) -> Result<usize> {
845        Ok(self.conn.execute(
846            "DELETE FROM feedback_events
847             WHERE trace_id=? AND chunk_id=? AND signal=?",
848            params![trace_id, chunk_id, signal],
849        )?)
850    }
851
852    pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
853        self.conn.execute(
854            "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
855            params![now, now, id],
856        )?;
857        Ok(())
858    }
859
860    #[allow(clippy::too_many_arguments)]
861    pub fn update_context_stat(
862        &self,
863        chunk_id: &str,
864        context_key: &str,
865        success: i64,
866        failure: i64,
867        positive: i64,
868        negative: i64,
869        now: &str,
870    ) -> Result<()> {
871        self.conn.execute(
872            "INSERT INTO chunk_context_stats
873             (chunk_id, context_key, success_count, failure_count,
874              positive_feedback, negative_feedback, last_updated_at)
875             VALUES (?,?,?,?,?,?,?)
876             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
877               success_count=success_count+excluded.success_count,
878               failure_count=failure_count+excluded.failure_count,
879               positive_feedback=positive_feedback+excluded.positive_feedback,
880               negative_feedback=negative_feedback+excluded.negative_feedback,
881               last_updated_at=excluded.last_updated_at",
882            params![
883                chunk_id,
884                context_key,
885                success,
886                failure,
887                positive,
888                negative,
889                now
890            ],
891        )?;
892        Ok(())
893    }
894
895    pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
896        let row = self
897            .conn
898            .query_row(
899                "SELECT success_count, failure_count, positive_feedback, negative_feedback
900                 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
901                params![chunk_id, context_key],
902                |row| {
903                    Ok((
904                        row.get::<_, i64>(0)?,
905                        row.get::<_, i64>(1)?,
906                        row.get::<_, i64>(2)?,
907                        row.get::<_, i64>(3)?,
908                    ))
909                },
910            )
911            .optional()?;
912        let Some((success, failure, positive, negative)) = row else {
913            return Ok(0.0);
914        };
915        let wins = success as f64 + positive as f64 * 2.0;
916        let losses = failure as f64 + negative as f64 * 2.0;
917        let evidence = wins + losses;
918        let posterior = (wins + 1.0) / (evidence + 2.0);
919        let evidence_weight = (evidence / 5.0).min(1.0);
920        Ok((posterior - 0.5) * 2.0 * evidence_weight)
921    }
922
923    #[allow(clippy::too_many_arguments)]
924    pub fn upsert_governance_proposal(
925        &self,
926        id: &str,
927        chunk_id: &str,
928        proposal_type: &str,
929        reason: &str,
930        evidence_count: i64,
931        evidence_score: f64,
932        actor_count: i64,
933        now: &str,
934    ) -> Result<()> {
935        self.conn.execute(
936            "INSERT INTO governance_proposals
937             (id, chunk_id, proposal_type, reason, evidence_count,
938              evidence_score, actor_count, state, created_at, updated_at)
939             VALUES (?,?,?,?,?,?,?,'pending',?,?)
940             ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
941             DO UPDATE SET reason=excluded.reason,
942                           evidence_count=excluded.evidence_count,
943                           evidence_score=excluded.evidence_score,
944                           actor_count=excluded.actor_count,
945                           updated_at=excluded.updated_at",
946            params![
947                id,
948                chunk_id,
949                proposal_type,
950                reason,
951                evidence_count,
952                evidence_score,
953                actor_count,
954                now,
955                now
956            ],
957        )?;
958        Ok(())
959    }
960
961    pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
962        let priority = match reason {
963            "governance_ready" => 100,
964            "governance" => 80,
965            "threshold" => 60,
966            "batch_continue" => 40,
967            _ => 20,
968        };
969        self.conn.execute(
970            "INSERT INTO evolve_requests(id, reason, state, requested_at, priority)
971             VALUES (?,?,'pending',?,?)
972             ON CONFLICT(reason) WHERE state='pending'
973             DO UPDATE SET priority=MAX(priority, excluded.priority)",
974            params![id, reason, now, priority],
975        )?;
976        Ok(())
977    }
978
979    pub fn claim_evolve_request(&self, now: &str, stale_before: &str) -> Result<Option<String>> {
980        self.conn.execute(
981            "UPDATE evolve_requests
982             SET state='pending', leased_at=NULL, note='lease_recovered'
983             WHERE state='running' AND leased_at < ?",
984            [stale_before],
985        )?;
986        self.conn.execute(
987            "UPDATE evolve_requests
988             SET state='pending', leased_at=NULL, note='retry_failed'
989             WHERE state='failed' AND attempts < 3
990               AND COALESCE(next_retry_at, completed_at) < ?",
991            [now],
992        )?;
993        Ok(self
994            .conn
995            .query_row(
996                "UPDATE evolve_requests
997                 SET state='running', leased_at=?, attempts=attempts+1
998                 WHERE id=(
999                   SELECT id FROM evolve_requests
1000                   WHERE state='pending' ORDER BY priority DESC, requested_at ASC LIMIT 1
1001                 ) AND state='pending'
1002                 RETURNING id",
1003                [now],
1004                |row| row.get(0),
1005            )
1006            .optional()?)
1007    }
1008
1009    pub fn finish_evolve_request(
1010        &self,
1011        id: &str,
1012        state: &str,
1013        note: Option<&str>,
1014        now: &str,
1015    ) -> Result<()> {
1016        self.conn.execute(
1017            "UPDATE evolve_requests
1018             SET state=?, completed_at=?, note=?,
1019                 last_failed_at=CASE
1020                   WHEN ?='failed' THEN ?
1021                   ELSE last_failed_at
1022                 END,
1023                 next_retry_at=CASE WHEN ?='failed'
1024                   THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
1025                   ELSE NULL END
1026             WHERE id=?",
1027            params![state, now, note, state, now, state, now, id],
1028        )?;
1029        Ok(())
1030    }
1031
1032    pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
1033        self.conn.execute(
1034            "UPDATE evolve_requests
1035             SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
1036             WHERE state='pending' AND requested_at <= ?",
1037            params![now, requested_before],
1038        )?;
1039        Ok(())
1040    }
1041
1042    /// Update by primary-key id (used after distill where we have the row id, not trace_id).
1043    pub fn update_episodic_log_state_by_id(
1044        &self,
1045        id: &str,
1046        state: &str,
1047        note: Option<&str>,
1048        outcome: Option<&str>,
1049    ) -> Result<()> {
1050        self.conn.execute(
1051            "UPDATE episodic_log
1052             SET distill_state=?, distill_note=COALESCE(?,distill_note),
1053                 outcome=COALESCE(?,outcome),
1054                 distill_run_id=NULL, distill_locked_at=NULL
1055             WHERE id=?",
1056            params![state, note, outcome, id],
1057        )?;
1058        Ok(())
1059    }
1060
1061    #[allow(clippy::too_many_arguments)]
1062    pub fn finish_distill_log(
1063        &self,
1064        id: &str,
1065        state: &str,
1066        note: Option<&str>,
1067        prompt_tokens: i64,
1068        completion_tokens: i64,
1069        accounted_at: &str,
1070    ) -> Result<()> {
1071        self.conn.execute(
1072            "UPDATE episodic_log
1073             SET distill_state=?, distill_note=?,
1074                 distill_prompt_tokens=?, distill_completion_tokens=?,
1075                 distill_accounted_at=?,
1076                 distill_attempts=distill_attempts
1077                   + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
1078                 distill_last_failed_at=CASE
1079                   WHEN ?='failed' THEN ?
1080                   ELSE distill_last_failed_at
1081                 END,
1082                 distill_run_id=NULL, distill_locked_at=NULL
1083             WHERE id=?",
1084            params![
1085                state,
1086                note,
1087                prompt_tokens,
1088                completion_tokens,
1089                accounted_at,
1090                state,
1091                state,
1092                accounted_at,
1093                id
1094            ],
1095        )?;
1096        Ok(())
1097    }
1098
1099    /// Claim a batch of 'new' logs for distillation: mark them 'screening' atomically.
1100    /// Returns the claimed rows (with distill_run_id set to run_id).
1101    pub fn claim_distill_batch(
1102        &self,
1103        run_id: &str,
1104        limit: usize,
1105        locked_at: &str,
1106    ) -> Result<Vec<Value>> {
1107        // BEGIN IMMEDIATE must be held by caller; this is called inside a transaction.
1108        self.conn.execute(
1109            "UPDATE episodic_log
1110             SET distill_state='screening', distill_run_id=?, distill_locked_at=?
1111             WHERE id IN (
1112               SELECT id FROM episodic_log
1113               WHERE distill_state='new'
1114               ORDER BY priority DESC, ts ASC
1115               LIMIT ?
1116             )",
1117            params![run_id, locked_at, limit as i64],
1118        )?;
1119        self.query_json(
1120            "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
1121            params![run_id],
1122        )
1123    }
1124
1125    pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
1126        self.query_json(
1127            "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
1128            params![limit as i64],
1129        )
1130    }
1131
1132    // ------------------------------------------------------------------
1133    // Chunk queries (aggregate / curate helpers)
1134    // ------------------------------------------------------------------
1135
1136    pub fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
1137        self.query_json(sql, params![])
1138    }
1139
1140    pub fn query_chunks_params<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1141        self.query_json(sql, p)
1142    }
1143
1144    // ------------------------------------------------------------------
1145    // Deps
1146    // ------------------------------------------------------------------
1147
1148    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<(String, String, Option<String>)>> {
1149        let mut stmt = self
1150            .conn
1151            .prepare("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
1152        let rows = stmt.query_map([chunk_id], |r| {
1153            Ok((
1154                r.get::<_, String>(0)?,
1155                r.get::<_, String>(1)?,
1156                r.get::<_, Option<String>>(2)?,
1157            ))
1158        })?;
1159        Ok(rows.filter_map(|r| r.ok()).collect())
1160    }
1161
1162    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
1163        let mut stmt = self.conn.prepare("SELECT src FROM deps WHERE dst=?")?;
1164        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
1165        Ok(rows.filter_map(|r| r.ok()).collect())
1166    }
1167
1168    pub fn insert_dep(
1169        &self,
1170        src: &str,
1171        dst: &str,
1172        kind: &str,
1173        dst_lib: Option<&str>,
1174    ) -> Result<()> {
1175        self.conn.execute(
1176            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
1177            params![src, dst, kind, dst_lib],
1178        )?;
1179        Ok(())
1180    }
1181
1182    // ------------------------------------------------------------------
1183    // Chunk success traces (aggregate fact table)
1184    // ------------------------------------------------------------------
1185
1186    pub fn upsert_chunk_success_trace(
1187        &self,
1188        chunk_id: &str,
1189        trace_id: &str,
1190        ts: &str,
1191    ) -> Result<()> {
1192        self.conn.execute(
1193            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
1194            params![chunk_id, trace_id, ts],
1195        )?;
1196        Ok(())
1197    }
1198
1199    // ------------------------------------------------------------------
1200    // Shared library support
1201    // ------------------------------------------------------------------
1202
1203    pub fn attach_shared(&self, path: &str, alias: &str) -> Result<()> {
1204        self.conn.execute_batch(&format!(
1205            "ATTACH DATABASE '{}' AS '{alias}'",
1206            path.replace('\'', "''")
1207        ))?;
1208        Ok(())
1209    }
1210
1211    pub fn lib_id(&self) -> Result<String> {
1212        Ok(self
1213            .get_meta("lib_id")?
1214            .unwrap_or_else(|| "unknown".to_string()))
1215    }
1216
1217    // ------------------------------------------------------------------
1218    // Generic helpers
1219    // ------------------------------------------------------------------
1220
1221    fn query_json<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1222        let mut stmt = self.conn.prepare(sql)?;
1223        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
1224        let rows = stmt.query_map(p, |r| row_to_json_with_names(r, &names))?;
1225        Ok(rows.filter_map(|r| r.ok()).collect())
1226    }
1227
1228    pub fn execute(&self, sql: &str) -> Result<()> {
1229        self.conn.execute_batch(sql)?;
1230        Ok(())
1231    }
1232
1233    /// Execute a parameterised statement (not batch); returns rows-affected count.
1234    pub fn conn_execute<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<()> {
1235        self.conn.execute(sql, p)?;
1236        Ok(())
1237    }
1238}
1239
1240// ------------------------------------------------------------------
1241// Row types
1242// ------------------------------------------------------------------
1243
1244#[derive(Debug, Default, Clone)]
1245pub struct ChunkRow {
1246    pub id: String,
1247    pub skill_name: Option<String>,
1248    pub seq: i64,
1249    pub content: String,
1250    pub trigger_desc: Option<String>,
1251    pub anti_trigger_desc: Option<String>,
1252    pub content_hash: String,
1253    pub token_count: Option<i64>,
1254    pub origin: String,
1255    pub source: Option<String>,
1256    pub maturity: Option<String>,
1257    pub related_ids: Option<String>,
1258    pub protected: i64,
1259    pub state: String,
1260    pub state_reason: Option<String>,
1261    pub state_updated_at: Option<String>,
1262    pub confidence: f64,
1263    pub confidence_reason: Option<String>,
1264    pub version: i64,
1265    pub distilled_from: Option<String>,
1266    pub distill_provider: Option<String>,
1267    pub distill_model: Option<String>,
1268    pub distill_prompt_version: Option<String>,
1269    pub parent_id: Option<String>,
1270    pub selected_count: i64,
1271    pub used_count: i64,
1272    pub used_success_count: i64,
1273    pub success_trace_ids_count: i64,
1274    pub last_success_at: Option<String>,
1275    pub last_agg_ts: Option<String>,
1276    pub embed_version: i64,
1277    pub created_at: String,
1278    pub updated_at: String,
1279    pub last_used_at: Option<String>,
1280}
1281
1282#[derive(Debug, Default)]
1283pub struct EpisodicLogRow {
1284    pub id: String,
1285    pub trace_id: String,
1286    pub lib_id: String,
1287    pub ts: String,
1288    pub query: Option<String>,
1289    pub recall_snapshot: Option<String>,
1290    pub output: Option<String>,
1291    pub output_summary: Option<String>,
1292    pub outcome: Option<String>,
1293    pub event_source: String,
1294    pub task_state: String,
1295    pub completed_at: Option<String>,
1296    pub usage_state: String,
1297    pub used_ids: Option<String>,
1298    pub used_attribution: Option<String>,
1299    pub used_complete: bool,
1300    pub context_key: Option<String>,
1301    pub nomination: Option<String>,
1302    pub priority: i64,
1303    pub distill_state: String,
1304    pub distill_note: Option<String>,
1305}
1306
1307// ------------------------------------------------------------------
1308// Helpers
1309// ------------------------------------------------------------------
1310
1311fn configure_pragmas(conn: &Connection) -> Result<()> {
1312    conn.execute_batch(
1313        "PRAGMA journal_mode=WAL;
1314         PRAGMA foreign_keys=ON;
1315         PRAGMA synchronous=NORMAL;
1316         PRAGMA cache_size=-65536;
1317         PRAGMA mmap_size=268435456;
1318         PRAGMA temp_store=memory;",
1319    )?;
1320    // Validate WAL mode was accepted (some VFS/filesystems silently downgrade).
1321    let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
1322    if mode != "wal" {
1323        return Err(crate::errors::InnateError::Other(format!(
1324            "WAL mode required but got '{mode}'; check filesystem support"
1325        )));
1326    }
1327    Ok(())
1328}
1329
1330fn ver_tuple(v: &str) -> (u32, u32, u32) {
1331    let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
1332    (
1333        parts.first().copied().unwrap_or(0),
1334        parts.get(1).copied().unwrap_or(0),
1335        parts.get(2).copied().unwrap_or(0),
1336    )
1337}
1338
1339/// Convert a rusqlite Row to serde_json::Value using column names from statement.
1340fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
1341    let mut map = serde_json::Map::new();
1342    for (i, name) in names.iter().enumerate() {
1343        let v = row_value_at(row, i);
1344        map.insert(name.clone(), v);
1345    }
1346    Ok(Value::Object(map))
1347}
1348
1349fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
1350    let count = row.as_ref().column_count();
1351    let mut map = serde_json::Map::new();
1352    for i in 0..count {
1353        let name = row.as_ref().column_name(i)?.to_string();
1354        let v = row_value_at(row, i);
1355        map.insert(name, v);
1356    }
1357    Ok(Value::Object(map))
1358}
1359
1360fn row_value_at(row: &Row, i: usize) -> Value {
1361    // Try types in preference order
1362    if let Ok(v) = row.get::<_, Option<String>>(i) {
1363        return v.map(Value::String).unwrap_or(Value::Null);
1364    }
1365    if let Ok(v) = row.get::<_, Option<i64>>(i) {
1366        return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
1367    }
1368    if let Ok(v) = row.get::<_, Option<f64>>(i) {
1369        return v
1370            .and_then(serde_json::Number::from_f64)
1371            .map(Value::Number)
1372            .unwrap_or(Value::Null);
1373    }
1374    Value::Null
1375}
1376
1377trait OptionalExt<T> {
1378    fn optional(self) -> rusqlite::Result<Option<T>>;
1379}
1380impl<T> OptionalExt<T> for rusqlite::Result<T> {
1381    fn optional(self) -> rusqlite::Result<Option<T>> {
1382        match self {
1383            Ok(v) => Ok(Some(v)),
1384            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1385            Err(e) => Err(e),
1386        }
1387    }
1388}