Skip to main content

innate_core/
storage.rs

1//! SQLite storage layer.
2//!
3//! Replaces sqlite-vec virtual tables with ordinary BLOB columns + pure-Rust
4//! cosine similarity, keeping the schema otherwise aligned with v4.5.x.
5
6use std::cell::{Cell, RefCell};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use rusqlite::{params, Connection, Row};
11use serde_json::Value;
12
13use crate::errors::{InnateError, Result};
14use crate::utils::{cosine_similarity, unpack_embedding};
15
16const EXPECTED_SCHEMA_VERSION: &str = "4.14";
17
18// Embedded SQL schema — no external files needed.
19const SCHEMA_SQL: &str = include_str!("schema.sql");
20
21type VectorEntries = Vec<(String, Vec<f32>)>;
22type VectorCache = RefCell<Option<VectorEntries>>;
23
24pub struct Storage {
25    pub db_path: PathBuf,
26    conn: Connection,
27    pub content_dim: usize,
28    pub trigger_dim: usize,
29    /// Pre-parsed in-memory caches for vector search; None = cold (not loaded or invalidated).
30    vec_content_cache: VectorCache,
31    vec_trigger_cache: VectorCache,
32    /// Last observed vector revision. Only vector writes advance this value.
33    vector_cache_revision: Cell<Option<i64>>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct EvolveRequestClaim {
38    pub id: String,
39    pub reason: String,
40}
41
42impl Storage {
43    pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
44        let db_path = db_path.as_ref().to_path_buf();
45        if let Some(parent) = db_path.parent() {
46            std::fs::create_dir_all(parent)?;
47        }
48        let conn = Connection::open(&db_path)?;
49        configure_pragmas(&conn)?;
50        let mut s = Self {
51            db_path,
52            conn,
53            content_dim,
54            trigger_dim,
55            vec_content_cache: RefCell::new(None),
56            vec_trigger_cache: RefCell::new(None),
57            vector_cache_revision: Cell::new(None),
58        };
59        s.init_schema()?;
60        Ok(s)
61    }
62
63    pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
64        let db_path = db_path.as_ref().to_path_buf();
65        let conn = Connection::open_with_flags(
66            &db_path,
67            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
68        )?;
69        conn.pragma_update(None, "query_only", "ON")?;
70        conn.pragma_update(None, "foreign_keys", "ON")?;
71        let s = Self {
72            db_path,
73            conn,
74            content_dim: 1024,
75            trigger_dim: 256,
76            vec_content_cache: RefCell::new(None),
77            vec_trigger_cache: RefCell::new(None),
78            vector_cache_revision: Cell::new(None),
79        };
80        Ok(s)
81    }
82
83    fn init_schema(&mut self) -> Result<()> {
84        let has_meta: bool = self.conn.query_row(
85            "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
86            [],
87            |r| r.get::<_, i64>(0),
88        )? > 0;
89
90        if !has_meta {
91            // Wrap schema creation in a transaction for atomicity.
92            self.conn.execute_batch("BEGIN IMMEDIATE")?;
93            let r = self.conn.execute_batch(SCHEMA_SQL);
94            if r.is_ok() {
95                self.conn.execute_batch("COMMIT")?;
96            } else {
97                let _ = self.conn.execute_batch("ROLLBACK");
98                r?;
99            }
100            return Ok(());
101        }
102
103        let current: Option<String> = self
104            .conn
105            .query_row(
106                "SELECT value FROM meta WHERE key='schema_version'",
107                [],
108                |r| r.get(0),
109            )
110            .optional()?;
111
112        let current = current
113            .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
114
115        let cur = ver_tuple(&current);
116        let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
117
118        match cur.cmp(&exp) {
119            std::cmp::Ordering::Equal => Ok(()),
120            std::cmp::Ordering::Greater => {
121                // Forward-compat: newer schema, warn but allow.
122                eprintln!(
123                    "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
124                );
125                Ok(())
126            }
127            std::cmp::Ordering::Less => {
128                // Delegate to the proper migration chain which handles all steps atomically.
129                let applied = crate::migrate::run_migrations(&self.db_path)?;
130                if !applied.is_empty() {
131                    eprintln!("[innate] auto-migrated: {}", applied.join(", "));
132                }
133                Ok(())
134            }
135        }
136    }
137
138    // ------------------------------------------------------------------
139    // Transactions
140    // ------------------------------------------------------------------
141
142    pub fn begin_immediate(&self) -> Result<()> {
143        self.conn.execute_batch("BEGIN IMMEDIATE")?;
144        Ok(())
145    }
146
147    pub fn commit(&self) -> Result<()> {
148        self.conn.execute_batch("COMMIT")?;
149        Ok(())
150    }
151
152    pub fn rollback(&self) -> Result<()> {
153        self.conn.execute_batch("ROLLBACK")?;
154        Ok(())
155    }
156
157    // ------------------------------------------------------------------
158    // Meta
159    // ------------------------------------------------------------------
160
161    pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
162        Ok(self
163            .conn
164            .query_row("SELECT value FROM meta WHERE key=?", [key], |r| r.get(0))
165            .optional()?)
166    }
167
168    pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
169        self.conn.execute(
170            "INSERT OR REPLACE INTO meta(key, value) VALUES (?,?)",
171            params![key, value],
172        )?;
173        Ok(())
174    }
175
176    pub fn get_meta_or(&self, key: &str, default: &str) -> String {
177        self.get_meta(key)
178            .ok()
179            .flatten()
180            .unwrap_or_else(|| default.to_string())
181    }
182
183    // ------------------------------------------------------------------
184    // Chunk CRUD
185    // ------------------------------------------------------------------
186
187    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
188        self.conn.execute(
189            "INSERT INTO chunks (
190                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
191                content_hash, token_count, origin, source, maturity, related_ids,
192                protected, state, state_reason, state_updated_at,
193                confidence, confidence_base, confidence_reason, version, distilled_from,
194                distill_provider, distill_model, distill_prompt_version, parent_id,
195                selected_count, used_count, used_success_count,
196                success_trace_ids_count, last_success_at, last_agg_ts,
197                embed_version, created_at, updated_at, last_used_at
198            ) VALUES (
199                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
200                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
201                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
202            )",
203            params![
204                c.id,
205                c.skill_name,
206                c.seq,
207                c.content,
208                c.trigger_desc,
209                c.anti_trigger_desc,
210                c.content_hash,
211                c.token_count,
212                c.origin,
213                c.source,
214                c.maturity,
215                c.related_ids,
216                c.protected,
217                c.state,
218                c.state_reason,
219                c.state_updated_at,
220                c.confidence,
221                c.confidence,
222                c.confidence_reason,
223                c.version,
224                c.distilled_from,
225                c.distill_provider,
226                c.distill_model,
227                c.distill_prompt_version,
228                c.parent_id,
229                c.selected_count,
230                c.used_count,
231                c.used_success_count,
232                c.success_trace_ids_count,
233                c.last_success_at,
234                c.last_agg_ts,
235                c.embed_version,
236                c.created_at,
237                c.updated_at,
238                c.last_used_at
239            ],
240        )?;
241        Ok(())
242    }
243
244    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
245        self.conn.execute(
246            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
247            params![chunk_id, emb],
248        )?;
249        *self.vec_content_cache.borrow_mut() = None;
250        Ok(())
251    }
252
253    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
254        self.conn.execute(
255            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
256            params![chunk_id, emb],
257        )?;
258        self.conn.execute(
259            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
260             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
261            [],
262        )?;
263        *self.vec_trigger_cache.borrow_mut() = None;
264        Ok(())
265    }
266
267    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
268        let row = self
269            .conn
270            .query_row("SELECT * FROM chunks WHERE id=?", [id], row_to_json);
271        match row {
272            Ok(v) => Ok(Some(v)),
273            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
274            Err(e) => Err(e.into()),
275        }
276    }
277
278    pub fn update_chunk_state(
279        &self,
280        id: &str,
281        state: &str,
282        reason: Option<&str>,
283        now: &str,
284    ) -> Result<()> {
285        self.conn.execute(
286            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
287            params![state, reason, now, now, id],
288        )?;
289        Ok(())
290    }
291
292    pub fn update_chunk_confidence(
293        &self,
294        id: &str,
295        conf: f64,
296        reason: Option<&str>,
297        now: &str,
298    ) -> Result<()> {
299        self.conn.execute(
300            "UPDATE chunks
301             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
302             WHERE id=?",
303            params![conf, conf, reason, now, id],
304        )?;
305        Ok(())
306    }
307
308    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
309        self.conn.execute(
310            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
311            params![now, now, id],
312        )?;
313        Ok(())
314    }
315
316    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
317        let row = self.conn.query_row(
318            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
319            [hash],
320            row_to_json,
321        );
322        match row {
323            Ok(v) => Ok(Some(v)),
324            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
325            Err(e) => Err(e.into()),
326        }
327    }
328
329    // ------------------------------------------------------------------
330    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
331    // ------------------------------------------------------------------
332
333    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
334        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
335    }
336
337    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
338        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
339    }
340
341    fn search_vec(
342        &self,
343        cache_cell: &VectorCache,
344        table: &str,
345        query: &[f32],
346        limit: usize,
347    ) -> Result<Vec<(String, f32)>> {
348        if limit == 0 {
349            return Ok(Vec::new());
350        }
351        self.refresh_vector_caches_if_changed()?;
352
353        // Populate cache on first access after open or invalidation.
354        if cache_cell.borrow().is_none() {
355            let sql = format!("SELECT chunk_id, embedding FROM {table}");
356            let mut stmt = self.conn.prepare(&sql)?;
357            let entries: Vec<(String, Vec<f32>)> = stmt
358                .query_map([], |r| {
359                    let id: String = r.get(0)?;
360                    let blob: Vec<u8> = r.get(1)?;
361                    Ok((id, blob))
362                })?
363                .filter_map(|r| r.ok())
364                .map(|(id, blob)| (id, unpack_embedding(&blob)))
365                .collect();
366            *cache_cell.borrow_mut() = Some(entries);
367        }
368
369        let cache = cache_cell.borrow();
370        let entries = cache.as_ref().unwrap();
371
372        // Compute similarities, then partial-sort to bring top-limit to the front (O(N log K)).
373        let mut results: Vec<(String, f32)> = entries
374            .iter()
375            .map(|(id, v)| (id.clone(), cosine_similarity(query, v)))
376            .collect();
377        if results.len() > limit {
378            results.select_nth_unstable_by(limit - 1, |a, b| {
379                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
380            });
381            results.truncate(limit);
382        }
383        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
384        Ok(results)
385    }
386
387    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
388        let current = self
389            .get_meta("vector_revision")?
390            .and_then(|value| value.parse::<i64>().ok())
391            .unwrap_or(0);
392        let previous = self.vector_cache_revision.replace(Some(current));
393        if previous.is_some_and(|revision| revision != current) {
394            *self.vec_content_cache.borrow_mut() = None;
395            *self.vec_trigger_cache.borrow_mut() = None;
396        }
397        Ok(())
398    }
399
400    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
401    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
402        if ids.is_empty() {
403            return Ok(HashMap::new());
404        }
405        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
406        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
407        let mut stmt = self.conn.prepare(&sql)?;
408        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
409        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
410            row_to_json_with_names(r, &names)
411        })?;
412        let mut map = HashMap::with_capacity(ids.len());
413        for row in rows.filter_map(|r| r.ok()) {
414            if let Some(id) = row.get("id").and_then(Value::as_str) {
415                map.insert(id.to_string(), row);
416            }
417        }
418        Ok(map)
419    }
420
421    // ------------------------------------------------------------------
422    // Invalidated hashes
423    // ------------------------------------------------------------------
424
425    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
426        let count: i64 = self.conn.query_row(
427            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
428            [hash],
429            |r| r.get(0),
430        )?;
431        Ok(count > 0)
432    }
433
434    pub fn insert_invalidated_hash(
435        &self,
436        hash: &str,
437        reason: Option<&str>,
438        ts: &str,
439    ) -> Result<()> {
440        self.conn.execute(
441            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
442            params![hash, reason, ts],
443        )?;
444        Ok(())
445    }
446
447    // ------------------------------------------------------------------
448    // Usage trace
449    // ------------------------------------------------------------------
450
451    #[allow(clippy::too_many_arguments)]
452    pub fn insert_usage_trace(
453        &self,
454        trace_id: &str,
455        chunk_id: Option<&str>,
456        event: &str,
457        strength: f64,
458        similarity: Option<f64>,
459        refine_mode: Option<&str>,
460        tokens: Option<i64>,
461        rank: Option<i64>,
462        attribution: Option<&str>,
463        source: &str,
464        ts: &str,
465    ) -> Result<usize> {
466        Ok(self.conn.execute(
467            "INSERT OR IGNORE INTO usage_trace
468             (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
469             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
470            params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
471        )?)
472    }
473
474    pub fn replace_used_trace(
475        &self,
476        trace_id: &str,
477        used_ids: &[String],
478        strength: f64,
479        attribution: &str,
480        source: &str,
481        ts: &str,
482    ) -> Result<()> {
483        self.conn.execute(
484            "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
485            [trace_id],
486        )?;
487        for chunk_id in used_ids {
488            self.insert_usage_trace(
489                trace_id,
490                Some(chunk_id),
491                "used",
492                strength,
493                None,
494                None,
495                None,
496                None,
497                Some(attribution),
498                source,
499                ts,
500            )?;
501        }
502        Ok(())
503    }
504
505    pub fn merge_used_trace(
506        &self,
507        trace_id: &str,
508        used_ids: &[String],
509        strength: f64,
510        attribution: &str,
511        source: &str,
512        ts: &str,
513    ) -> Result<()> {
514        let attribution_rank = |value: &str| match value {
515            "explicit" => 3,
516            "cited" => 2,
517            "inferred" => 1,
518            _ => 0,
519        };
520        for chunk_id in used_ids {
521            let existing = self
522                .query_chunks_params(
523                    "SELECT attribution, strength FROM usage_trace
524                     WHERE trace_id=? AND chunk_id=? AND event='used'",
525                    params![trace_id, chunk_id],
526                )?
527                .into_iter()
528                .next();
529            if let Some(row) = existing {
530                let existing_attribution = row
531                    .get("attribution")
532                    .and_then(Value::as_str)
533                    .unwrap_or("inferred");
534                if attribution_rank(attribution) > attribution_rank(existing_attribution) {
535                    self.conn.execute(
536                        "UPDATE usage_trace
537                         SET strength=?, attribution=?, source=?, ts=?
538                         WHERE trace_id=? AND chunk_id=? AND event='used'",
539                        params![
540                            strength,
541                            attribution,
542                            source,
543                            ts,
544                            trace_id,
545                            chunk_id
546                        ],
547                    )?;
548                }
549            } else {
550                self.insert_usage_trace(
551                    trace_id,
552                    Some(chunk_id),
553                    "used",
554                    strength,
555                    None,
556                    None,
557                    None,
558                    None,
559                    Some(attribution),
560                    source,
561                    ts,
562                )?;
563            }
564        }
565        Ok(())
566    }
567
568    pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
569        self.conn.execute(
570            "UPDATE chunks
571             SET last_used_at=COALESCE(
572                   (SELECT MAX(ts) FROM usage_trace
573                    WHERE chunk_id=? AND event='used'
574                      AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
575                   last_used_base
576                 ),
577                 updated_at=?
578             WHERE id=?",
579            params![chunk_id, now, chunk_id],
580        )?;
581        Ok(())
582    }
583
584    pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
585        let row = self.conn.query_row(
586            "SELECT event FROM usage_trace
587             WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
588             LIMIT 1",
589            [trace_id],
590            |r| r.get::<_, String>(0),
591        );
592        match row {
593            Ok(v) => Ok(Some(v)),
594            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
595            Err(e) => Err(e.into()),
596        }
597    }
598
599    pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
600        // Preserve compact attribution facts. They are required to replay corrections.
601        let n = self.conn.execute(
602            "DELETE FROM usage_trace
603             WHERE ts < ?
604             AND event IN ('retrieved','refined')
605             AND NOT (event = 'retrieved'
606                      AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
607            [before_ts],
608        )?;
609        Ok(n)
610    }
611
612    // ------------------------------------------------------------------
613    // Episodic log
614    // ------------------------------------------------------------------
615
616    pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
617        self.conn.execute(
618            "INSERT OR REPLACE INTO episodic_log
619             (id, trace_id, lib_id, ts, query, recall_snapshot, output,
620              output_summary, outcome, event_source, task_state, completed_at,
621              usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
622              distill_state, distill_note, distill_attempts, distill_last_failed_at)
623             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
624            params![
625                log.id,
626                log.trace_id,
627                log.lib_id,
628                log.ts,
629                log.query,
630                log.recall_snapshot,
631                log.output,
632                log.output_summary,
633                log.outcome,
634                log.event_source,
635                log.task_state,
636                log.completed_at,
637                log.usage_state,
638                log.used_ids,
639                log.used_attribution,
640                i64::from(log.used_complete),
641                log.context_key,
642                log.nomination,
643                log.priority,
644                log.distill_state,
645                log.distill_note
646            ],
647        )?;
648        Ok(())
649    }
650
651    pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
652        let row = self.conn.query_row(
653            "SELECT * FROM episodic_log WHERE trace_id=?",
654            [trace_id],
655            row_to_json,
656        );
657        match row {
658            Ok(v) => Ok(Some(v)),
659            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
660            Err(e) => Err(e.into()),
661        }
662    }
663
664    pub fn update_episodic_log_state(
665        &self,
666        trace_id: &str,
667        state: &str,
668        note: Option<&str>,
669        outcome: Option<&str>,
670    ) -> Result<()> {
671        self.conn.execute(
672            "UPDATE episodic_log
673             SET distill_state=?, distill_note=COALESCE(?,distill_note),
674                 outcome=COALESCE(?,outcome),
675                 distill_run_id=NULL, distill_locked_at=NULL
676             WHERE trace_id=?",
677            params![state, note, outcome, trace_id],
678        )?;
679        Ok(())
680    }
681
682    /// Patch content fields on an existing episodic_log row (補写: output_summary, nomination, etc.)
683    pub fn patch_episodic_log_content(
684        &self,
685        trace_id: &str,
686        query: Option<&str>,
687        output: Option<&str>,
688        output_summary: Option<&str>,
689        nomination: Option<&str>,
690        priority: i64,
691    ) -> Result<()> {
692        self.conn.execute(
693            "UPDATE episodic_log
694             SET output_summary = COALESCE(?, output_summary),
695                 nomination     = COALESCE(?, nomination),
696                 output         = COALESCE(?, output),
697                 query          = COALESCE(?, query),
698                 priority       = MAX(priority, ?)
699             WHERE trace_id = ?",
700            params![
701                output_summary,
702                nomination,
703                output,
704                query,
705                priority,
706                trace_id
707            ],
708        )?;
709        Ok(())
710    }
711
712    #[allow(clippy::too_many_arguments)]
713    pub fn update_trace_lifecycle(
714        &self,
715        trace_id: &str,
716        task_state: &str,
717        completed_at: Option<&str>,
718        usage_state: Option<&str>,
719        used_ids: Option<&str>,
720        used_attribution: Option<&str>,
721        used_complete: Option<bool>,
722    ) -> Result<()> {
723        self.conn.execute(
724            "UPDATE episodic_log
725             SET task_state=?,
726                 completed_at=COALESCE(?, completed_at),
727                 usage_state=COALESCE(?, usage_state),
728                 used_ids=COALESCE(?, used_ids),
729                 used_attribution=COALESCE(?, used_attribution),
730                 used_complete=COALESCE(?, used_complete)
731             WHERE trace_id=?",
732            params![
733                task_state,
734                completed_at,
735                usage_state,
736                used_ids,
737                used_attribution,
738                used_complete.map(i64::from),
739                trace_id
740            ],
741        )?;
742        Ok(())
743    }
744
745    #[allow(clippy::too_many_arguments)]
746    pub fn upsert_confidence_evidence(
747        &self,
748        id: &str,
749        trace_id: Option<&str>,
750        chunk_id: &str,
751        kind: &str,
752        target: f64,
753        alpha: f64,
754        reason: &str,
755        context_key: Option<&str>,
756        ts: &str,
757    ) -> Result<()> {
758        self.conn.execute(
759            "INSERT INTO confidence_evidence
760             (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
761             VALUES (?,?,?,?,?,?,?,?,?)
762             ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
763             DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
764                           reason=excluded.reason, context_key=excluded.context_key",
765            params![id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts],
766        )?;
767        Ok(())
768    }
769
770    pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
771        for kind in kinds {
772            self.conn.execute(
773                "DELETE FROM confidence_evidence WHERE trace_id=? AND kind=?",
774                params![trace_id, kind],
775            )?;
776        }
777        Ok(())
778    }
779
780    pub fn delete_chunk_trace_confidence_evidence(
781        &self,
782        trace_id: &str,
783        chunk_id: &str,
784        kind: &str,
785    ) -> Result<()> {
786        self.conn.execute(
787            "DELETE FROM confidence_evidence
788             WHERE trace_id=? AND chunk_id=? AND kind=?",
789            params![trace_id, chunk_id, kind],
790        )?;
791        Ok(())
792    }
793
794    pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
795        self.query_json(
796            "SELECT target, alpha, reason, ts, id
797             FROM confidence_evidence WHERE chunk_id=?
798             ORDER BY ts ASC,
799                      CASE kind
800                        WHEN 'outcome_ok' THEN 1
801                        WHEN 'outcome_fail' THEN 1
802                        WHEN 'selected_unused' THEN 2
803                        WHEN 'feedback_up' THEN 3
804                        WHEN 'feedback_down' THEN 3
805                        WHEN 'decay' THEN 4
806                        ELSE 5
807                      END ASC,
808                      kind ASC, id ASC",
809            [chunk_id],
810        )
811    }
812
813    #[allow(clippy::too_many_arguments)]
814    pub fn insert_feedback_event(
815        &self,
816        id: &str,
817        trace_id: &str,
818        chunk_id: &str,
819        signal: &str,
820        strength: f64,
821        source: &str,
822        actor: Option<&str>,
823        reason: Option<&str>,
824        context_key: Option<&str>,
825        ts: &str,
826    ) -> Result<usize> {
827        Ok(self.conn.execute(
828            "INSERT OR IGNORE INTO feedback_events
829             (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
830             VALUES (?,?,?,?,?,?,?,?,?,?)",
831            params![
832                id,
833                trace_id,
834                chunk_id,
835                signal,
836                strength,
837                source,
838                actor,
839                reason,
840                context_key,
841                ts
842            ],
843        )?)
844    }
845
846    pub fn delete_feedback_event(
847        &self,
848        trace_id: &str,
849        chunk_id: &str,
850        signal: &str,
851    ) -> Result<usize> {
852        Ok(self.conn.execute(
853            "DELETE FROM feedback_events
854             WHERE trace_id=? AND chunk_id=? AND signal=?",
855            params![trace_id, chunk_id, signal],
856        )?)
857    }
858
859    pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
860        self.conn.execute(
861            "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
862            params![now, now, id],
863        )?;
864        Ok(())
865    }
866
867    #[allow(clippy::too_many_arguments)]
868    pub fn update_context_stat(
869        &self,
870        chunk_id: &str,
871        context_key: &str,
872        success: i64,
873        failure: i64,
874        positive: i64,
875        negative: i64,
876        now: &str,
877    ) -> Result<()> {
878        self.conn.execute(
879            "INSERT INTO chunk_context_stats
880             (chunk_id, context_key, success_count, failure_count,
881              positive_feedback, negative_feedback, last_updated_at)
882             VALUES (?,?,?,?,?,?,?)
883             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
884               success_count=success_count+excluded.success_count,
885               failure_count=failure_count+excluded.failure_count,
886               positive_feedback=positive_feedback+excluded.positive_feedback,
887               negative_feedback=negative_feedback+excluded.negative_feedback,
888               last_updated_at=excluded.last_updated_at",
889            params![
890                chunk_id,
891                context_key,
892                success,
893                failure,
894                positive,
895                negative,
896                now
897            ],
898        )?;
899        Ok(())
900    }
901
902    pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
903        let row = self
904            .conn
905            .query_row(
906                "SELECT success_count, failure_count, positive_feedback, negative_feedback
907                 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
908                params![chunk_id, context_key],
909                |row| {
910                    Ok((
911                        row.get::<_, i64>(0)?,
912                        row.get::<_, i64>(1)?,
913                        row.get::<_, i64>(2)?,
914                        row.get::<_, i64>(3)?,
915                    ))
916                },
917            )
918            .optional()?;
919        let Some((success, failure, positive, negative)) = row else {
920            return Ok(0.0);
921        };
922        let wins = success as f64 + positive as f64 * 2.0;
923        let losses = failure as f64 + negative as f64 * 2.0;
924        let evidence = wins + losses;
925        let posterior = (wins + 1.0) / (evidence + 2.0);
926        let evidence_weight = (evidence / 5.0).min(1.0);
927        Ok((posterior - 0.5) * 2.0 * evidence_weight)
928    }
929
930    #[allow(clippy::too_many_arguments)]
931    pub fn upsert_governance_proposal(
932        &self,
933        id: &str,
934        chunk_id: &str,
935        proposal_type: &str,
936        reason: &str,
937        evidence_count: i64,
938        evidence_score: f64,
939        actor_count: i64,
940        now: &str,
941    ) -> Result<()> {
942        self.conn.execute(
943            "INSERT INTO governance_proposals
944             (id, chunk_id, proposal_type, reason, evidence_count,
945              evidence_score, actor_count, state, created_at, updated_at)
946             VALUES (?,?,?,?,?,?,?,'pending',?,?)
947             ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
948             DO UPDATE SET reason=excluded.reason,
949                           evidence_count=excluded.evidence_count,
950                           evidence_score=excluded.evidence_score,
951                           actor_count=excluded.actor_count,
952                           updated_at=excluded.updated_at",
953            params![
954                id,
955                chunk_id,
956                proposal_type,
957                reason,
958                evidence_count,
959                evidence_score,
960                actor_count,
961                now,
962                now
963            ],
964        )?;
965        Ok(())
966    }
967
968    pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
969        self.request_evolve_at(id, reason, now, None)
970    }
971
972    pub fn request_evolve_at(
973        &self,
974        id: &str,
975        reason: &str,
976        now: &str,
977        next_retry_at: Option<&str>,
978    ) -> Result<()> {
979        let priority = match reason {
980            "governance_ready" => 100,
981            "governance" => 80,
982            "threshold" => 60,
983            "distill_retry" => 50,
984            "batch_continue" => 40,
985            _ => 20,
986        };
987        self.conn.execute(
988            "INSERT INTO evolve_requests(
989               id, reason, state, requested_at, priority, next_retry_at
990             )
991             VALUES (?,?,'pending',?,?,?)
992             ON CONFLICT(reason) WHERE state='pending'
993             DO UPDATE SET
994               priority=MAX(priority, excluded.priority),
995               next_retry_at=CASE
996                 WHEN excluded.next_retry_at IS NULL THEN NULL
997                 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
998                 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
999               END",
1000            params![id, reason, now, priority, next_retry_at],
1001        )?;
1002        Ok(())
1003    }
1004
1005    pub fn claim_evolve_request(&self, now: &str, stale_before: &str) -> Result<Option<String>> {
1006        Ok(self
1007            .claim_evolve_request_with_reason(now, stale_before)?
1008            .map(|claim| claim.id))
1009    }
1010
1011    pub fn claim_evolve_request_with_reason(
1012        &self,
1013        now: &str,
1014        stale_before: &str,
1015    ) -> Result<Option<EvolveRequestClaim>> {
1016        self.conn.execute(
1017            "UPDATE evolve_requests
1018             SET state='pending', leased_at=NULL, note='lease_recovered'
1019             WHERE state='running' AND leased_at < ?",
1020            [stale_before],
1021        )?;
1022        self.conn.execute(
1023            "UPDATE evolve_requests
1024             SET state='pending', leased_at=NULL, note='retry_failed'
1025             WHERE state='failed' AND attempts < 3
1026               AND COALESCE(next_retry_at, completed_at) < ?",
1027            [now],
1028        )?;
1029        Ok(self.conn.query_row(
1030            "UPDATE evolve_requests
1031             SET state='running', leased_at=?, attempts=attempts+1
1032             WHERE id=(
1033               SELECT id FROM evolve_requests
1034               WHERE state='pending'
1035                 AND (next_retry_at IS NULL OR next_retry_at <= ?)
1036               ORDER BY priority DESC, requested_at ASC LIMIT 1
1037             ) AND state='pending'
1038             RETURNING id, reason",
1039            params![now, now],
1040            |row| {
1041                Ok(EvolveRequestClaim {
1042                    id: row.get(0)?,
1043                    reason: row.get(1)?,
1044                })
1045            },
1046        ).optional()?)
1047    }
1048
1049    pub fn defer_evolve_request(
1050        &self,
1051        id: &str,
1052        note: &str,
1053        next_retry_at: &str,
1054    ) -> Result<()> {
1055        self.conn.execute(
1056            "DELETE FROM evolve_requests
1057             WHERE state='pending'
1058               AND id!=?
1059               AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
1060            params![id, id],
1061        )?;
1062        self.conn.execute(
1063            "UPDATE evolve_requests
1064             SET state='pending', leased_at=NULL, completed_at=NULL,
1065                 note=?, next_retry_at=?
1066             WHERE id=?",
1067            params![note, next_retry_at, id],
1068        )?;
1069        Ok(())
1070    }
1071
1072    pub fn finish_evolve_request(
1073        &self,
1074        id: &str,
1075        state: &str,
1076        note: Option<&str>,
1077        now: &str,
1078    ) -> Result<()> {
1079        self.conn.execute(
1080            "UPDATE evolve_requests
1081             SET state=?, completed_at=?, note=?,
1082                 last_failed_at=CASE
1083                   WHEN ?='failed' THEN ?
1084                   ELSE last_failed_at
1085                 END,
1086                 next_retry_at=CASE WHEN ?='failed'
1087                   THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
1088                   ELSE NULL END
1089             WHERE id=?",
1090            params![state, now, note, state, now, state, now, id],
1091        )?;
1092        Ok(())
1093    }
1094
1095    pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
1096        self.conn.execute(
1097            "UPDATE evolve_requests
1098             SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
1099             WHERE state='pending' AND requested_at <= ?",
1100            params![now, requested_before],
1101        )?;
1102        Ok(())
1103    }
1104
1105    /// Update by primary-key id (used after distill where we have the row id, not trace_id).
1106    pub fn update_episodic_log_state_by_id(
1107        &self,
1108        id: &str,
1109        state: &str,
1110        note: Option<&str>,
1111        outcome: Option<&str>,
1112    ) -> Result<()> {
1113        self.conn.execute(
1114            "UPDATE episodic_log
1115             SET distill_state=?, distill_note=COALESCE(?,distill_note),
1116                 outcome=COALESCE(?,outcome),
1117                 distill_run_id=NULL, distill_locked_at=NULL
1118             WHERE id=?",
1119            params![state, note, outcome, id],
1120        )?;
1121        Ok(())
1122    }
1123
1124    #[allow(clippy::too_many_arguments)]
1125    pub fn finish_distill_log(
1126        &self,
1127        id: &str,
1128        state: &str,
1129        note: Option<&str>,
1130        prompt_tokens: i64,
1131        completion_tokens: i64,
1132        accounted_at: &str,
1133    ) -> Result<()> {
1134        self.conn.execute(
1135            "INSERT INTO distill_token_usage(
1136               log_id, prompt_tokens, completion_tokens, outcome, accounted_at
1137             ) VALUES (?,?,?,?,?)",
1138            params![
1139                id,
1140                prompt_tokens,
1141                completion_tokens,
1142                state,
1143                accounted_at
1144            ],
1145        )?;
1146        self.conn.execute(
1147            "UPDATE episodic_log
1148             SET distill_state=?, distill_note=?,
1149                 distill_prompt_tokens=?, distill_completion_tokens=?,
1150                 distill_accounted_at=?,
1151                 distill_attempts=distill_attempts
1152                   + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
1153                 distill_last_failed_at=CASE
1154                   WHEN ?='failed' THEN ?
1155                   ELSE distill_last_failed_at
1156                 END,
1157                 distill_run_id=NULL, distill_locked_at=NULL
1158             WHERE id=?",
1159            params![
1160                state,
1161                note,
1162                prompt_tokens,
1163                completion_tokens,
1164                accounted_at,
1165                state,
1166                state,
1167                accounted_at,
1168                id
1169            ],
1170        )?;
1171        Ok(())
1172    }
1173
1174    /// Claim a batch of 'new' logs for distillation: mark them 'screening' atomically.
1175    /// Returns the claimed rows (with distill_run_id set to run_id).
1176    pub fn claim_distill_batch(
1177        &self,
1178        run_id: &str,
1179        limit: usize,
1180        locked_at: &str,
1181    ) -> Result<Vec<Value>> {
1182        // BEGIN IMMEDIATE must be held by caller; this is called inside a transaction.
1183        self.conn.execute(
1184            "UPDATE episodic_log
1185             SET distill_state='screening', distill_run_id=?, distill_locked_at=?
1186             WHERE id IN (
1187               SELECT id FROM episodic_log
1188               WHERE distill_state='new'
1189               ORDER BY priority DESC, ts ASC
1190               LIMIT ?
1191             )",
1192            params![run_id, locked_at, limit as i64],
1193        )?;
1194        self.query_json(
1195            "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
1196            params![run_id],
1197        )
1198    }
1199
1200    pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
1201        self.query_json(
1202            "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
1203            params![limit as i64],
1204        )
1205    }
1206
1207    // ------------------------------------------------------------------
1208    // Chunk queries (aggregate / curate helpers)
1209    // ------------------------------------------------------------------
1210
1211    pub fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
1212        self.query_json(sql, params![])
1213    }
1214
1215    pub fn query_chunks_params<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1216        self.query_json(sql, p)
1217    }
1218
1219    // ------------------------------------------------------------------
1220    // Deps
1221    // ------------------------------------------------------------------
1222
1223    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<(String, String, Option<String>)>> {
1224        let mut stmt = self
1225            .conn
1226            .prepare("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
1227        let rows = stmt.query_map([chunk_id], |r| {
1228            Ok((
1229                r.get::<_, String>(0)?,
1230                r.get::<_, String>(1)?,
1231                r.get::<_, Option<String>>(2)?,
1232            ))
1233        })?;
1234        Ok(rows.filter_map(|r| r.ok()).collect())
1235    }
1236
1237    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
1238        let mut stmt = self.conn.prepare("SELECT src FROM deps WHERE dst=?")?;
1239        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
1240        Ok(rows.filter_map(|r| r.ok()).collect())
1241    }
1242
1243    pub fn insert_dep(
1244        &self,
1245        src: &str,
1246        dst: &str,
1247        kind: &str,
1248        dst_lib: Option<&str>,
1249    ) -> Result<()> {
1250        self.conn.execute(
1251            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
1252            params![src, dst, kind, dst_lib],
1253        )?;
1254        Ok(())
1255    }
1256
1257    // ------------------------------------------------------------------
1258    // Chunk success traces (aggregate fact table)
1259    // ------------------------------------------------------------------
1260
1261    pub fn upsert_chunk_success_trace(
1262        &self,
1263        chunk_id: &str,
1264        trace_id: &str,
1265        ts: &str,
1266    ) -> Result<()> {
1267        self.conn.execute(
1268            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
1269            params![chunk_id, trace_id, ts],
1270        )?;
1271        Ok(())
1272    }
1273
1274    // ------------------------------------------------------------------
1275    // Shared library support
1276    // ------------------------------------------------------------------
1277
1278    pub fn attach_shared(&self, path: &str, alias: &str) -> Result<()> {
1279        self.conn.execute_batch(&format!(
1280            "ATTACH DATABASE '{}' AS '{alias}'",
1281            path.replace('\'', "''")
1282        ))?;
1283        Ok(())
1284    }
1285
1286    pub fn lib_id(&self) -> Result<String> {
1287        Ok(self
1288            .get_meta("lib_id")?
1289            .unwrap_or_else(|| "unknown".to_string()))
1290    }
1291
1292    // ------------------------------------------------------------------
1293    // Generic helpers
1294    // ------------------------------------------------------------------
1295
1296    fn query_json<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
1297        let mut stmt = self.conn.prepare(sql)?;
1298        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
1299        let rows = stmt.query_map(p, |r| row_to_json_with_names(r, &names))?;
1300        Ok(rows.filter_map(|r| r.ok()).collect())
1301    }
1302
1303    pub fn execute(&self, sql: &str) -> Result<()> {
1304        self.conn.execute_batch(sql)?;
1305        Ok(())
1306    }
1307
1308    /// Execute a parameterised statement (not batch); returns rows-affected count.
1309    pub fn conn_execute<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<()> {
1310        self.conn.execute(sql, p)?;
1311        Ok(())
1312    }
1313
1314    pub fn conn_execute_count<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<usize> {
1315        Ok(self.conn.execute(sql, p)?)
1316    }
1317}
1318
1319// ------------------------------------------------------------------
1320// Row types
1321// ------------------------------------------------------------------
1322
1323#[derive(Debug, Default, Clone)]
1324pub struct ChunkRow {
1325    pub id: String,
1326    pub skill_name: Option<String>,
1327    pub seq: i64,
1328    pub content: String,
1329    pub trigger_desc: Option<String>,
1330    pub anti_trigger_desc: Option<String>,
1331    pub content_hash: String,
1332    pub token_count: Option<i64>,
1333    pub origin: String,
1334    pub source: Option<String>,
1335    pub maturity: Option<String>,
1336    pub related_ids: Option<String>,
1337    pub protected: i64,
1338    pub state: String,
1339    pub state_reason: Option<String>,
1340    pub state_updated_at: Option<String>,
1341    pub confidence: f64,
1342    pub confidence_reason: Option<String>,
1343    pub version: i64,
1344    pub distilled_from: Option<String>,
1345    pub distill_provider: Option<String>,
1346    pub distill_model: Option<String>,
1347    pub distill_prompt_version: Option<String>,
1348    pub parent_id: Option<String>,
1349    pub selected_count: i64,
1350    pub used_count: i64,
1351    pub used_success_count: i64,
1352    pub success_trace_ids_count: i64,
1353    pub last_success_at: Option<String>,
1354    pub last_agg_ts: Option<String>,
1355    pub embed_version: i64,
1356    pub created_at: String,
1357    pub updated_at: String,
1358    pub last_used_at: Option<String>,
1359}
1360
1361#[derive(Debug, Default)]
1362pub struct EpisodicLogRow {
1363    pub id: String,
1364    pub trace_id: String,
1365    pub lib_id: String,
1366    pub ts: String,
1367    pub query: Option<String>,
1368    pub recall_snapshot: Option<String>,
1369    pub output: Option<String>,
1370    pub output_summary: Option<String>,
1371    pub outcome: Option<String>,
1372    pub event_source: String,
1373    pub task_state: String,
1374    pub completed_at: Option<String>,
1375    pub usage_state: String,
1376    pub used_ids: Option<String>,
1377    pub used_attribution: Option<String>,
1378    pub used_complete: bool,
1379    pub context_key: Option<String>,
1380    pub nomination: Option<String>,
1381    pub priority: i64,
1382    pub distill_state: String,
1383    pub distill_note: Option<String>,
1384}
1385
1386// ------------------------------------------------------------------
1387// Helpers
1388// ------------------------------------------------------------------
1389
1390fn configure_pragmas(conn: &Connection) -> Result<()> {
1391    conn.execute_batch(
1392        "PRAGMA journal_mode=WAL;
1393         PRAGMA foreign_keys=ON;
1394         PRAGMA synchronous=NORMAL;
1395         PRAGMA cache_size=-65536;
1396         PRAGMA mmap_size=268435456;
1397         PRAGMA temp_store=memory;",
1398    )?;
1399    // Validate WAL mode was accepted (some VFS/filesystems silently downgrade).
1400    let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
1401    if mode != "wal" {
1402        return Err(crate::errors::InnateError::Other(format!(
1403            "WAL mode required but got '{mode}'; check filesystem support"
1404        )));
1405    }
1406    Ok(())
1407}
1408
1409fn ver_tuple(v: &str) -> (u32, u32, u32) {
1410    let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
1411    (
1412        parts.first().copied().unwrap_or(0),
1413        parts.get(1).copied().unwrap_or(0),
1414        parts.get(2).copied().unwrap_or(0),
1415    )
1416}
1417
1418/// Convert a rusqlite Row to serde_json::Value using column names from statement.
1419fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
1420    let mut map = serde_json::Map::new();
1421    for (i, name) in names.iter().enumerate() {
1422        let v = row_value_at(row, i);
1423        map.insert(name.clone(), v);
1424    }
1425    Ok(Value::Object(map))
1426}
1427
1428fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
1429    let count = row.as_ref().column_count();
1430    let mut map = serde_json::Map::new();
1431    for i in 0..count {
1432        let name = row.as_ref().column_name(i)?.to_string();
1433        let v = row_value_at(row, i);
1434        map.insert(name, v);
1435    }
1436    Ok(Value::Object(map))
1437}
1438
1439fn row_value_at(row: &Row, i: usize) -> Value {
1440    // Try types in preference order
1441    if let Ok(v) = row.get::<_, Option<String>>(i) {
1442        return v.map(Value::String).unwrap_or(Value::Null);
1443    }
1444    if let Ok(v) = row.get::<_, Option<i64>>(i) {
1445        return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
1446    }
1447    if let Ok(v) = row.get::<_, Option<f64>>(i) {
1448        return v
1449            .and_then(serde_json::Number::from_f64)
1450            .map(Value::Number)
1451            .unwrap_or(Value::Null);
1452    }
1453    Value::Null
1454}
1455
1456trait OptionalExt<T> {
1457    fn optional(self) -> rusqlite::Result<Option<T>>;
1458}
1459impl<T> OptionalExt<T> for rusqlite::Result<T> {
1460    fn optional(self) -> rusqlite::Result<Option<T>> {
1461        match self {
1462            Ok(v) => Ok(Some(v)),
1463            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1464            Err(e) => Err(e),
1465        }
1466    }
1467}