Skip to main content

innate_core/
storage.rs

1//! SQLite storage layer.
2//!
3//! Replaces sqlite-vec virtual tables with ordinary BLOB columns + pure-Rust
4//! cosine similarity, keeping the schema otherwise identical to v4.5.1.
5
6use std::path::{Path, PathBuf};
7
8use rusqlite::{params, Connection, Row};
9use serde_json::Value;
10
11use crate::errors::{InnateError, Result};
12use crate::utils::{cosine_similarity, unpack_embedding};
13
14const EXPECTED_SCHEMA_VERSION: &str = "4.5.1";
15
16// Embedded SQL schema — no external files needed.
17const SCHEMA_SQL: &str = include_str!("schema.sql");
18
19pub struct Storage {
20    pub db_path: PathBuf,
21    conn: Connection,
22    pub content_dim: usize,
23    pub trigger_dim: usize,
24}
25
26impl Storage {
27    pub fn open(db_path: impl AsRef<Path>, content_dim: usize, trigger_dim: usize) -> Result<Self> {
28        let db_path = db_path.as_ref().to_path_buf();
29        if let Some(parent) = db_path.parent() {
30            std::fs::create_dir_all(parent)?;
31        }
32        let conn = Connection::open(&db_path)?;
33        configure_pragmas(&conn)?;
34        let mut s = Self {
35            db_path,
36            conn,
37            content_dim,
38            trigger_dim,
39        };
40        s.init_schema()?;
41        Ok(s)
42    }
43
44    pub fn open_readonly(db_path: impl AsRef<Path>) -> Result<Self> {
45        let db_path = db_path.as_ref().to_path_buf();
46        let conn = Connection::open_with_flags(
47            &db_path,
48            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
49        )?;
50        conn.pragma_update(None, "query_only", "ON")?;
51        conn.pragma_update(None, "foreign_keys", "ON")?;
52        let s = Self {
53            db_path,
54            conn,
55            content_dim: 1024,
56            trigger_dim: 256,
57        };
58        Ok(s)
59    }
60
61    fn init_schema(&mut self) -> Result<()> {
62        let has_meta: bool = self.conn.query_row(
63            "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
64            [],
65            |r| r.get::<_, i64>(0),
66        )? > 0;
67
68        if !has_meta {
69            // Wrap schema creation in a transaction for atomicity.
70            self.conn.execute_batch("BEGIN IMMEDIATE")?;
71            let r = self.conn.execute_batch(SCHEMA_SQL);
72            if r.is_ok() {
73                self.conn.execute_batch("COMMIT")?;
74            } else {
75                let _ = self.conn.execute_batch("ROLLBACK");
76                r?;
77            }
78            return Ok(());
79        }
80
81        let current: Option<String> = self
82            .conn
83            .query_row(
84                "SELECT value FROM meta WHERE key='schema_version'",
85                [],
86                |r| r.get(0),
87            )
88            .optional()?;
89
90        let current = current
91            .ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))?;
92
93        let cur = ver_tuple(&current);
94        let exp = ver_tuple(EXPECTED_SCHEMA_VERSION);
95
96        match cur.cmp(&exp) {
97            std::cmp::Ordering::Equal => Ok(()),
98            std::cmp::Ordering::Greater => {
99                // Forward-compat: newer schema, warn but allow.
100                eprintln!(
101                    "[innate] warning: db schema {current} > expected {EXPECTED_SCHEMA_VERSION}"
102                );
103                Ok(())
104            }
105            std::cmp::Ordering::Less => {
106                // Delegate to the proper migration chain which handles all steps atomically.
107                let applied = crate::migrate::run_migrations(&self.db_path)?;
108                if !applied.is_empty() {
109                    eprintln!("[innate] auto-migrated: {}", applied.join(", "));
110                }
111                Ok(())
112            }
113        }
114    }
115
116    // ------------------------------------------------------------------
117    // Transactions
118    // ------------------------------------------------------------------
119
120    pub fn begin_immediate(&self) -> Result<()> {
121        self.conn.execute_batch("BEGIN IMMEDIATE")?;
122        Ok(())
123    }
124
125    pub fn commit(&self) -> Result<()> {
126        self.conn.execute_batch("COMMIT")?;
127        Ok(())
128    }
129
130    pub fn rollback(&self) -> Result<()> {
131        self.conn.execute_batch("ROLLBACK")?;
132        Ok(())
133    }
134
135    // ------------------------------------------------------------------
136    // Meta
137    // ------------------------------------------------------------------
138
139    pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
140        Ok(self
141            .conn
142            .query_row("SELECT value FROM meta WHERE key=?", [key], |r| r.get(0))
143            .optional()?)
144    }
145
146    pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
147        self.conn.execute(
148            "INSERT OR REPLACE INTO meta(key, value) VALUES (?,?)",
149            params![key, value],
150        )?;
151        Ok(())
152    }
153
154    pub fn get_meta_or(&self, key: &str, default: &str) -> String {
155        self.get_meta(key)
156            .ok()
157            .flatten()
158            .unwrap_or_else(|| default.to_string())
159    }
160
161    // ------------------------------------------------------------------
162    // Chunk CRUD
163    // ------------------------------------------------------------------
164
165    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
166        self.conn.execute(
167            "INSERT INTO chunks (
168                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
169                content_hash, token_count, origin, source, maturity, related_ids,
170                protected, state, state_reason, state_updated_at,
171                confidence, confidence_reason, version, distilled_from, parent_id,
172                selected_count, used_count, used_success_count,
173                success_trace_ids_count, last_success_at, last_agg_ts,
174                embed_version, created_at, updated_at, last_used_at
175            ) VALUES (
176                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
177                ?13,?14,?15,?16,?17,?18,?19,?20,?21,
178                ?22,?23,?24,?25,?26,?27,?28,?29,?30,?31
179            )",
180            params![
181                c.id,
182                c.skill_name,
183                c.seq,
184                c.content,
185                c.trigger_desc,
186                c.anti_trigger_desc,
187                c.content_hash,
188                c.token_count,
189                c.origin,
190                c.source,
191                c.maturity,
192                c.related_ids,
193                c.protected,
194                c.state,
195                c.state_reason,
196                c.state_updated_at,
197                c.confidence,
198                c.confidence_reason,
199                c.version,
200                c.distilled_from,
201                c.parent_id,
202                c.selected_count,
203                c.used_count,
204                c.used_success_count,
205                c.success_trace_ids_count,
206                c.last_success_at,
207                c.last_agg_ts,
208                c.embed_version,
209                c.created_at,
210                c.updated_at,
211                c.last_used_at
212            ],
213        )?;
214        Ok(())
215    }
216
217    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
218        self.conn.execute(
219            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
220            params![chunk_id, emb],
221        )?;
222        Ok(())
223    }
224
225    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
226        self.conn.execute(
227            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
228            params![chunk_id, emb],
229        )?;
230        Ok(())
231    }
232
233    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
234        let row = self
235            .conn
236            .query_row("SELECT * FROM chunks WHERE id=?", [id], row_to_json);
237        match row {
238            Ok(v) => Ok(Some(v)),
239            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
240            Err(e) => Err(e.into()),
241        }
242    }
243
244    pub fn update_chunk_state(
245        &self,
246        id: &str,
247        state: &str,
248        reason: Option<&str>,
249        now: &str,
250    ) -> Result<()> {
251        self.conn.execute(
252            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
253            params![state, reason, now, now, id],
254        )?;
255        Ok(())
256    }
257
258    pub fn update_chunk_confidence(
259        &self,
260        id: &str,
261        conf: f64,
262        reason: Option<&str>,
263        now: &str,
264    ) -> Result<()> {
265        self.conn.execute(
266            "UPDATE chunks SET confidence=?, confidence_reason=?, updated_at=? WHERE id=?",
267            params![conf, reason, now, id],
268        )?;
269        Ok(())
270    }
271
272    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
273        self.conn.execute(
274            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
275            params![now, now, id],
276        )?;
277        Ok(())
278    }
279
280    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
281        let row = self.conn.query_row(
282            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
283            [hash],
284            row_to_json,
285        );
286        match row {
287            Ok(v) => Ok(Some(v)),
288            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
289            Err(e) => Err(e.into()),
290        }
291    }
292
293    // ------------------------------------------------------------------
294    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
295    // ------------------------------------------------------------------
296
297    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
298        self.search_vec("vec_content", query, limit)
299    }
300
301    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
302        self.search_vec("vec_trigger", query, limit)
303    }
304
305    fn search_vec(&self, table: &str, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
306        let sql = format!("SELECT chunk_id, embedding FROM {table}");
307        let mut stmt = self.conn.prepare(&sql)?;
308        let mut results: Vec<(String, f32)> = stmt
309            .query_map([], |r| {
310                let id: String = r.get(0)?;
311                let blob: Vec<u8> = r.get(1)?;
312                Ok((id, blob))
313            })?
314            .filter_map(|r| r.ok())
315            .map(|(id, blob)| {
316                let v = unpack_embedding(&blob);
317                let sim = cosine_similarity(query, &v);
318                (id, sim)
319            })
320            .collect();
321        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
322        results.truncate(limit);
323        Ok(results)
324    }
325
326    // ------------------------------------------------------------------
327    // Invalidated hashes
328    // ------------------------------------------------------------------
329
330    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
331        let count: i64 = self.conn.query_row(
332            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
333            [hash],
334            |r| r.get(0),
335        )?;
336        Ok(count > 0)
337    }
338
339    pub fn insert_invalidated_hash(
340        &self,
341        hash: &str,
342        reason: Option<&str>,
343        ts: &str,
344    ) -> Result<()> {
345        self.conn.execute(
346            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
347            params![hash, reason, ts],
348        )?;
349        Ok(())
350    }
351
352    // ------------------------------------------------------------------
353    // Usage trace
354    // ------------------------------------------------------------------
355
356    #[allow(clippy::too_many_arguments)]
357    pub fn insert_usage_trace(
358        &self,
359        trace_id: &str,
360        chunk_id: Option<&str>,
361        event: &str,
362        strength: f64,
363        similarity: Option<f64>,
364        refine_mode: Option<&str>,
365        tokens: Option<i64>,
366        rank: Option<i64>,
367        source: &str,
368        ts: &str,
369    ) -> Result<()> {
370        self.conn.execute(
371            "INSERT OR IGNORE INTO usage_trace
372             (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, source, ts)
373             VALUES (?,?,?,?,?,?,?,?,?,?)",
374            params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, source, ts],
375        )?;
376        Ok(())
377    }
378
379    pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
380        let row = self.conn.query_row(
381            "SELECT event FROM usage_trace
382             WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
383             LIMIT 1",
384            [trace_id],
385            |r| r.get::<_, String>(0),
386        );
387        match row {
388            Ok(v) => Ok(Some(v)),
389            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
390            Err(e) => Err(e.into()),
391        }
392    }
393
394    pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
395        // Preserve spark 'retrieved' traces — they power soft-incubation counts (§二·七).
396        let n = self.conn.execute(
397            "DELETE FROM usage_trace
398             WHERE ts < ?
399             AND NOT (event = 'retrieved'
400                      AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
401            [before_ts],
402        )?;
403        Ok(n)
404    }
405
406    // ------------------------------------------------------------------
407    // Episodic log
408    // ------------------------------------------------------------------
409
410    pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
411        self.conn.execute(
412            "INSERT OR REPLACE INTO episodic_log
413             (id, trace_id, lib_id, ts, query, recall_snapshot, output,
414              output_summary, outcome, event_source, nomination, priority,
415              distill_state, distill_note)
416             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14)",
417            params![
418                log.id,
419                log.trace_id,
420                log.lib_id,
421                log.ts,
422                log.query,
423                log.recall_snapshot,
424                log.output,
425                log.output_summary,
426                log.outcome,
427                log.event_source,
428                log.nomination,
429                log.priority,
430                log.distill_state,
431                log.distill_note
432            ],
433        )?;
434        Ok(())
435    }
436
437    pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
438        let row = self.conn.query_row(
439            "SELECT * FROM episodic_log WHERE trace_id=?",
440            [trace_id],
441            row_to_json,
442        );
443        match row {
444            Ok(v) => Ok(Some(v)),
445            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
446            Err(e) => Err(e.into()),
447        }
448    }
449
450    pub fn update_episodic_log_state(
451        &self,
452        trace_id: &str,
453        state: &str,
454        note: Option<&str>,
455        outcome: Option<&str>,
456    ) -> Result<()> {
457        self.conn.execute(
458            "UPDATE episodic_log
459             SET distill_state=?, distill_note=COALESCE(?,distill_note),
460                 outcome=COALESCE(?,outcome),
461                 distill_run_id=NULL, distill_locked_at=NULL
462             WHERE trace_id=?",
463            params![state, note, outcome, trace_id],
464        )?;
465        Ok(())
466    }
467
468    /// Patch content fields on an existing episodic_log row (補写: output_summary, nomination, etc.)
469    pub fn patch_episodic_log_content(
470        &self,
471        trace_id: &str,
472        query: Option<&str>,
473        output: Option<&str>,
474        output_summary: Option<&str>,
475        nomination: Option<&str>,
476        priority: i64,
477    ) -> Result<()> {
478        self.conn.execute(
479            "UPDATE episodic_log
480             SET output_summary = COALESCE(?, output_summary),
481                 nomination     = COALESCE(?, nomination),
482                 output         = COALESCE(?, output),
483                 query          = COALESCE(?, query),
484                 priority       = MAX(priority, ?)
485             WHERE trace_id = ?",
486            params![
487                output_summary,
488                nomination,
489                output,
490                query,
491                priority,
492                trace_id
493            ],
494        )?;
495        Ok(())
496    }
497
498    /// Update by primary-key id (used after distill where we have the row id, not trace_id).
499    pub fn update_episodic_log_state_by_id(
500        &self,
501        id: &str,
502        state: &str,
503        note: Option<&str>,
504        outcome: Option<&str>,
505    ) -> Result<()> {
506        self.conn.execute(
507            "UPDATE episodic_log
508             SET distill_state=?, distill_note=COALESCE(?,distill_note),
509                 outcome=COALESCE(?,outcome),
510                 distill_run_id=NULL, distill_locked_at=NULL
511             WHERE id=?",
512            params![state, note, outcome, id],
513        )?;
514        Ok(())
515    }
516
517    pub fn update_episodic_log_tokens(
518        &self,
519        id: &str,
520        prompt_tokens: i64,
521        completion_tokens: i64,
522    ) -> Result<()> {
523        self.conn.execute(
524            "UPDATE episodic_log
525             SET distill_prompt_tokens=?, distill_completion_tokens=?
526             WHERE id=?",
527            params![prompt_tokens, completion_tokens, id],
528        )?;
529        Ok(())
530    }
531
532    /// Claim a batch of 'new' logs for distillation: mark them 'screening' atomically.
533    /// Returns the claimed rows (with distill_run_id set to run_id).
534    pub fn claim_distill_batch(
535        &self,
536        run_id: &str,
537        limit: usize,
538        locked_at: &str,
539    ) -> Result<Vec<Value>> {
540        // BEGIN IMMEDIATE must be held by caller; this is called inside a transaction.
541        self.conn.execute(
542            "UPDATE episodic_log
543             SET distill_state='screening', distill_run_id=?, distill_locked_at=?
544             WHERE id IN (
545               SELECT id FROM episodic_log
546               WHERE distill_state='new'
547               ORDER BY priority DESC, ts ASC
548               LIMIT ?
549             )",
550            params![run_id, locked_at, limit as i64],
551        )?;
552        self.query_json(
553            "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
554            params![run_id],
555        )
556    }
557
558    pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
559        self.query_json(
560            "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
561            params![limit as i64],
562        )
563    }
564
565    // ------------------------------------------------------------------
566    // Chunk queries (aggregate / curate helpers)
567    // ------------------------------------------------------------------
568
569    pub fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
570        self.query_json(sql, params![])
571    }
572
573    pub fn query_chunks_params<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
574        self.query_json(sql, p)
575    }
576
577    // ------------------------------------------------------------------
578    // Deps
579    // ------------------------------------------------------------------
580
581    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<(String, String, Option<String>)>> {
582        let mut stmt = self
583            .conn
584            .prepare("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
585        let rows = stmt.query_map([chunk_id], |r| {
586            Ok((
587                r.get::<_, String>(0)?,
588                r.get::<_, String>(1)?,
589                r.get::<_, Option<String>>(2)?,
590            ))
591        })?;
592        Ok(rows.filter_map(|r| r.ok()).collect())
593    }
594
595    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
596        let mut stmt = self.conn.prepare("SELECT src FROM deps WHERE dst=?")?;
597        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
598        Ok(rows.filter_map(|r| r.ok()).collect())
599    }
600
601    pub fn insert_dep(
602        &self,
603        src: &str,
604        dst: &str,
605        kind: &str,
606        dst_lib: Option<&str>,
607    ) -> Result<()> {
608        self.conn.execute(
609            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
610            params![src, dst, kind, dst_lib],
611        )?;
612        Ok(())
613    }
614
615    // ------------------------------------------------------------------
616    // Chunk success traces (aggregate fact table)
617    // ------------------------------------------------------------------
618
619    pub fn upsert_chunk_success_trace(
620        &self,
621        chunk_id: &str,
622        trace_id: &str,
623        ts: &str,
624    ) -> Result<()> {
625        self.conn.execute(
626            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
627            params![chunk_id, trace_id, ts],
628        )?;
629        Ok(())
630    }
631
632    // ------------------------------------------------------------------
633    // Shared library support
634    // ------------------------------------------------------------------
635
636    pub fn attach_shared(&self, path: &str, alias: &str) -> Result<()> {
637        self.conn.execute_batch(&format!(
638            "ATTACH DATABASE '{}' AS '{alias}'",
639            path.replace('\'', "''")
640        ))?;
641        Ok(())
642    }
643
644    pub fn lib_id(&self) -> Result<String> {
645        Ok(self
646            .get_meta("lib_id")?
647            .unwrap_or_else(|| "unknown".to_string()))
648    }
649
650    // ------------------------------------------------------------------
651    // Generic helpers
652    // ------------------------------------------------------------------
653
654    fn query_json<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<Vec<Value>> {
655        let mut stmt = self.conn.prepare(sql)?;
656        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
657        let rows = stmt.query_map(p, |r| row_to_json_with_names(r, &names))?;
658        Ok(rows.filter_map(|r| r.ok()).collect())
659    }
660
661    pub fn execute(&self, sql: &str) -> Result<()> {
662        self.conn.execute_batch(sql)?;
663        Ok(())
664    }
665
666    /// Execute a parameterised statement (not batch); returns rows-affected count.
667    pub fn conn_execute<P: rusqlite::Params>(&self, sql: &str, p: P) -> Result<()> {
668        self.conn.execute(sql, p)?;
669        Ok(())
670    }
671}
672
673// ------------------------------------------------------------------
674// Row types
675// ------------------------------------------------------------------
676
677#[derive(Debug, Default, Clone)]
678pub struct ChunkRow {
679    pub id: String,
680    pub skill_name: Option<String>,
681    pub seq: i64,
682    pub content: String,
683    pub trigger_desc: Option<String>,
684    pub anti_trigger_desc: Option<String>,
685    pub content_hash: String,
686    pub token_count: Option<i64>,
687    pub origin: String,
688    pub source: Option<String>,
689    pub maturity: Option<String>,
690    pub related_ids: Option<String>,
691    pub protected: i64,
692    pub state: String,
693    pub state_reason: Option<String>,
694    pub state_updated_at: Option<String>,
695    pub confidence: f64,
696    pub confidence_reason: Option<String>,
697    pub version: i64,
698    pub distilled_from: Option<String>,
699    pub parent_id: Option<String>,
700    pub selected_count: i64,
701    pub used_count: i64,
702    pub used_success_count: i64,
703    pub success_trace_ids_count: i64,
704    pub last_success_at: Option<String>,
705    pub last_agg_ts: Option<String>,
706    pub embed_version: i64,
707    pub created_at: String,
708    pub updated_at: String,
709    pub last_used_at: Option<String>,
710}
711
712#[derive(Debug, Default)]
713pub struct EpisodicLogRow {
714    pub id: String,
715    pub trace_id: String,
716    pub lib_id: String,
717    pub ts: String,
718    pub query: Option<String>,
719    pub recall_snapshot: Option<String>,
720    pub output: Option<String>,
721    pub output_summary: Option<String>,
722    pub outcome: Option<String>,
723    pub event_source: String,
724    pub nomination: Option<String>,
725    pub priority: i64,
726    pub distill_state: String,
727    pub distill_note: Option<String>,
728}
729
730// ------------------------------------------------------------------
731// Helpers
732// ------------------------------------------------------------------
733
734fn configure_pragmas(conn: &Connection) -> Result<()> {
735    conn.execute_batch(
736        "PRAGMA journal_mode=WAL;
737         PRAGMA foreign_keys=ON;
738         PRAGMA synchronous=NORMAL;",
739    )?;
740    // Validate WAL mode was accepted (some VFS/filesystems silently downgrade).
741    let mode: String = conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
742    if mode != "wal" {
743        return Err(crate::errors::InnateError::Other(format!(
744            "WAL mode required but got '{mode}'; check filesystem support"
745        )));
746    }
747    Ok(())
748}
749
750fn ver_tuple(v: &str) -> (u32, u32, u32) {
751    let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
752    (
753        parts.first().copied().unwrap_or(0),
754        parts.get(1).copied().unwrap_or(0),
755        parts.get(2).copied().unwrap_or(0),
756    )
757}
758
759/// Convert a rusqlite Row to serde_json::Value using column names from statement.
760fn row_to_json_with_names(row: &Row, names: &[String]) -> rusqlite::Result<Value> {
761    let mut map = serde_json::Map::new();
762    for (i, name) in names.iter().enumerate() {
763        let v = row_value_at(row, i);
764        map.insert(name.clone(), v);
765    }
766    Ok(Value::Object(map))
767}
768
769fn row_to_json(row: &Row) -> rusqlite::Result<Value> {
770    let count = row.as_ref().column_count();
771    let mut map = serde_json::Map::new();
772    for i in 0..count {
773        let name = row.as_ref().column_name(i)?.to_string();
774        let v = row_value_at(row, i);
775        map.insert(name, v);
776    }
777    Ok(Value::Object(map))
778}
779
780fn row_value_at(row: &Row, i: usize) -> Value {
781    // Try types in preference order
782    if let Ok(v) = row.get::<_, Option<String>>(i) {
783        return v.map(Value::String).unwrap_or(Value::Null);
784    }
785    if let Ok(v) = row.get::<_, Option<i64>>(i) {
786        return v.map(|n| Value::Number(n.into())).unwrap_or(Value::Null);
787    }
788    if let Ok(v) = row.get::<_, Option<f64>>(i) {
789        return v
790            .and_then(serde_json::Number::from_f64)
791            .map(Value::Number)
792            .unwrap_or(Value::Null);
793    }
794    Value::Null
795}
796
797trait OptionalExt<T> {
798    fn optional(self) -> rusqlite::Result<Option<T>>;
799}
800impl<T> OptionalExt<T> for rusqlite::Result<T> {
801    fn optional(self) -> rusqlite::Result<Option<T>> {
802        match self {
803            Ok(v) => Ok(Some(v)),
804            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
805            Err(e) => Err(e),
806        }
807    }
808}