Skip to main content

semantic_memory/
db.rs

1//! Database initialization, migrations, integrity checks, and durable sidecar state.
2
3use crate::config::{EmbeddingConfig, MemoryLimits, PoolConfig};
4use crate::error::MemoryError;
5use crate::quantize::unpack_quantized;
6#[cfg(feature = "turbo-quant-codec")]
7use crate::types::{DerivedVectorArtifactGenerationV1, VectorArtifactBuildReceiptV1};
8use crate::types::{EpisodeOutcome, Role, VectorSearchReceiptV1, VerificationStatus};
9use chrono::{DateTime, Utc};
10use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use stack_ids::ContentDigest;
13#[cfg(feature = "turbo-quant-codec")]
14use stack_ids::DigestBuilder;
15use std::path::Path;
16
17/// V1 migration: full schema.
18const MIGRATION_V1: &str = r#"
19-- CONVERSATIONS
20CREATE TABLE sessions (
21    id          TEXT PRIMARY KEY,
22    channel     TEXT NOT NULL DEFAULT 'repl',
23    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
24    updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
25    metadata    TEXT
26);
27
28CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC);
29
30CREATE TABLE messages (
31    id          INTEGER PRIMARY KEY AUTOINCREMENT,
32    session_id  TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
33    role        TEXT NOT NULL CHECK (role IN ('system', 'user', 'assistant', 'tool')),
34    content     TEXT NOT NULL,
35    token_count INTEGER,
36    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
37    metadata    TEXT
38);
39
40CREATE INDEX idx_messages_session ON messages(session_id, created_at ASC);
41CREATE INDEX idx_messages_created ON messages(created_at DESC);
42
43-- KNOWLEDGE (Facts)
44CREATE TABLE facts (
45    id          TEXT PRIMARY KEY,
46    namespace   TEXT NOT NULL DEFAULT 'general',
47    content     TEXT NOT NULL,
48    source      TEXT,
49    embedding   BLOB,
50    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
51    updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
52    metadata    TEXT
53);
54
55CREATE INDEX idx_facts_namespace ON facts(namespace);
56CREATE INDEX idx_facts_updated ON facts(updated_at DESC);
57
58CREATE TABLE facts_rowid_map (
59    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
60    fact_id     TEXT NOT NULL UNIQUE REFERENCES facts(id) ON DELETE CASCADE
61);
62
63CREATE VIRTUAL TABLE facts_fts USING fts5(
64    content,
65    content='',
66    content_rowid='rowid',
67    tokenize='porter unicode61'
68);
69
70-- DOCUMENTS (Chunked content)
71CREATE TABLE documents (
72    id          TEXT PRIMARY KEY,
73    title       TEXT NOT NULL,
74    source_path TEXT,
75    namespace   TEXT NOT NULL DEFAULT 'general',
76    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
77    metadata    TEXT
78);
79
80CREATE TABLE chunks (
81    id          TEXT PRIMARY KEY,
82    document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
83    chunk_index INTEGER NOT NULL,
84    content     TEXT NOT NULL,
85    token_count INTEGER,
86    embedding   BLOB,
87    created_at  TEXT NOT NULL DEFAULT (datetime('now'))
88);
89
90CREATE INDEX idx_chunks_document ON chunks(document_id, chunk_index ASC);
91
92CREATE TABLE chunks_rowid_map (
93    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
94    chunk_id    TEXT NOT NULL UNIQUE REFERENCES chunks(id) ON DELETE CASCADE
95);
96
97CREATE VIRTUAL TABLE chunks_fts USING fts5(
98    content,
99    content='',
100    content_rowid='rowid',
101    tokenize='porter unicode61'
102);
103
104-- EMBEDDING METADATA
105CREATE TABLE embedding_metadata (
106    id          INTEGER PRIMARY KEY CHECK (id = 1),
107    model_name  TEXT NOT NULL,
108    dimensions  INTEGER NOT NULL,
109    updated_at  TEXT NOT NULL DEFAULT (datetime('now'))
110);
111"#;
112
113/// V2 migration: message embeddings for conversation search.
114const MIGRATION_V2: &str = r#"
115ALTER TABLE messages ADD COLUMN embedding BLOB;
116
117CREATE TABLE messages_rowid_map (
118    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
119    message_id  INTEGER NOT NULL UNIQUE REFERENCES messages(id) ON DELETE CASCADE
120);
121
122CREATE VIRTUAL TABLE messages_fts USING fts5(
123    content,
124    content='',
125    content_rowid='rowid',
126    tokenize='porter unicode61'
127);
128"#;
129
130/// V3 migration: embedding staleness tracking.
131const MIGRATION_V3: &str = r#"
132ALTER TABLE embedding_metadata ADD COLUMN embeddings_dirty INTEGER NOT NULL DEFAULT 0;
133"#;
134
135/// V4 migration: HNSW metadata tracking.
136const MIGRATION_V4: &str = r#"
137CREATE TABLE IF NOT EXISTS hnsw_metadata (
138    key TEXT PRIMARY KEY,
139    value TEXT NOT NULL
140);
141"#;
142
143/// V5 migration: quantized embeddings + HNSW keymap persistence.
144const MIGRATION_V5: &str = r#"
145ALTER TABLE facts ADD COLUMN embedding_q8 BLOB;
146ALTER TABLE chunks ADD COLUMN embedding_q8 BLOB;
147ALTER TABLE messages ADD COLUMN embedding_q8 BLOB;
148
149CREATE TABLE IF NOT EXISTS hnsw_keymap (
150    node_id     INTEGER PRIMARY KEY,
151    item_key    TEXT NOT NULL UNIQUE,
152    deleted     INTEGER NOT NULL DEFAULT 0
153);
154
155CREATE INDEX idx_hnsw_keymap_key ON hnsw_keymap(item_key);
156"#;
157
158/// V6 migration: episodes table for causal tracking.
159const MIGRATION_V6: &str = r#"
160CREATE TABLE IF NOT EXISTS episodes (
161    document_id TEXT PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
162    cause_ids TEXT NOT NULL,
163    effect_type TEXT NOT NULL,
164    outcome TEXT NOT NULL DEFAULT 'pending',
165    confidence REAL NOT NULL DEFAULT 0.0,
166    verification_status TEXT NOT NULL DEFAULT '{"status":"unverified"}',
167    experiment_id TEXT,
168    created_at TEXT NOT NULL DEFAULT (datetime('now'))
169);
170
171CREATE INDEX IF NOT EXISTS idx_episodes_effect_type ON episodes(effect_type);
172CREATE INDEX IF NOT EXISTS idx_episodes_outcome ON episodes(outcome);
173CREATE INDEX IF NOT EXISTS idx_episodes_experiment_id ON episodes(experiment_id);
174"#;
175
176/// V7 migration: searchable episodes + durable sidecar journal.
177const MIGRATION_V7: &str = r#"
178ALTER TABLE episodes ADD COLUMN updated_at TEXT NOT NULL DEFAULT (datetime('now'));
179ALTER TABLE episodes ADD COLUMN search_text TEXT NOT NULL DEFAULT '';
180ALTER TABLE episodes ADD COLUMN embedding BLOB;
181ALTER TABLE episodes ADD COLUMN embedding_q8 BLOB;
182
183CREATE TABLE IF NOT EXISTS episodes_rowid_map (
184    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
185    document_id TEXT NOT NULL UNIQUE REFERENCES episodes(document_id) ON DELETE CASCADE
186);
187
188CREATE VIRTUAL TABLE episodes_fts USING fts5(
189    content,
190    content='',
191    content_rowid='rowid',
192    tokenize='porter unicode61'
193);
194
195CREATE TABLE IF NOT EXISTS pending_index_ops (
196    item_key      TEXT PRIMARY KEY,
197    entity_type   TEXT NOT NULL,
198    op_kind       TEXT NOT NULL CHECK (op_kind IN ('upsert', 'delete')),
199    attempt_count INTEGER NOT NULL DEFAULT 0,
200    last_error    TEXT,
201    updated_at    TEXT NOT NULL DEFAULT (datetime('now'))
202);
203
204INSERT OR IGNORE INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '0');
205
206UPDATE episodes
207SET search_text = TRIM(
208    COALESCE(effect_type, '') || ' ' ||
209    COALESCE(outcome, '') || ' ' ||
210    COALESCE(experiment_id, '') || ' ' ||
211    COALESCE(cause_ids, '')
212)
213WHERE search_text = '';
214
215INSERT OR IGNORE INTO episodes_rowid_map (document_id)
216SELECT document_id FROM episodes;
217
218INSERT INTO episodes_fts (rowid, content)
219SELECT rm.rowid, e.search_text
220FROM episodes_rowid_map rm
221JOIN episodes e ON e.document_id = rm.document_id;
222"#;
223
224/// V8 migration: durable episode trace IDs.
225const MIGRATION_V8: &str = r#"
226ALTER TABLE episodes ADD COLUMN trace_id TEXT;
227"#;
228
229/// V9 migration: first-class episode identity + normalized causal edge table.
230///
231/// Rebuilds the episodes table so `episode_id` is the primary key while
232/// `document_id` becomes a non-unique FK allowing multiple episodes per doc.
233/// Adds `episode_causes` for normalized causal backlinks.
234///
235/// Applied via `run_migration_v9()` because it requires table rebuild.
236const MIGRATION_V9: &str = "";
237
238/// V18 migration: durable, replay-addressable search receipts.
239const MIGRATION_V18: &str = r#"
240CREATE TABLE IF NOT EXISTS search_receipts (
241    receipt_id             TEXT PRIMARY KEY,
242    schema_version         TEXT NOT NULL,
243    evaluation_time        TEXT NOT NULL,
244    search_profile         TEXT NOT NULL,
245    candidate_backend      TEXT NOT NULL,
246    approximate            INTEGER NOT NULL CHECK (approximate IN (0, 1)),
247    exact_rerank           INTEGER NOT NULL CHECK (exact_rerank IN (0, 1)),
248    fallback               TEXT,
249    requested_candidates   INTEGER NOT NULL CHECK (requested_candidates >= 0),
250    returned_candidates    INTEGER NOT NULL CHECK (returned_candidates >= 0),
251    post_filter_candidates INTEGER NOT NULL CHECK (post_filter_candidates >= 0),
252    result_ids_json        TEXT NOT NULL,
253    receipt_json           TEXT NOT NULL,
254    receipt_digest         TEXT NOT NULL,
255    created_at             TEXT NOT NULL DEFAULT (datetime('now'))
256);
257
258CREATE INDEX IF NOT EXISTS idx_search_receipts_created
259ON search_receipts(created_at DESC);
260
261CREATE INDEX IF NOT EXISTS idx_search_receipts_backend
262ON search_receipts(candidate_backend);
263"#;
264
265/// V19 migration: rebuildable derived vector acceleration artifacts.
266const MIGRATION_V19: &str = r#"
267CREATE TABLE IF NOT EXISTS derived_vector_artifacts (
268    item_key                TEXT NOT NULL,
269    codec_family            TEXT NOT NULL,
270    codec_profile_digest    TEXT NOT NULL,
271    source_embedding_digest TEXT NOT NULL,
272    encoded_digest          TEXT NOT NULL,
273    artifact_digest         TEXT NOT NULL,
274    encoding                TEXT NOT NULL,
275    dim                     INTEGER NOT NULL,
276    encoded                 BLOB NOT NULL,
277    created_at              TEXT NOT NULL DEFAULT (datetime('now')),
278    status                  TEXT NOT NULL DEFAULT 'active',
279    PRIMARY KEY (item_key, codec_family, codec_profile_digest)
280);
281
282CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_profile
283ON derived_vector_artifacts(codec_family, codec_profile_digest, status);
284
285CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_source_digest
286ON derived_vector_artifacts(source_embedding_digest);
287"#;
288
289/// V20 migration: align derived vector artifact rows with P31 evidence fields.
290const MIGRATION_V20: &str = r#"
291-- Procedural migration; see run_migration_v20.
292"#;
293
294/// V21 migration: generation-level manifests for derived vector artifacts.
295const MIGRATION_V21: &str = r#"
296CREATE TABLE IF NOT EXISTS derived_vector_artifact_generations (
297    generation_id            TEXT PRIMARY KEY,
298    schema_version           TEXT NOT NULL,
299    codec_family             TEXT NOT NULL,
300    codec_profile_digest     TEXT NOT NULL,
301    source_snapshot_digest   TEXT NOT NULL,
302    source_row_count         INTEGER NOT NULL,
303    artifact_count           INTEGER NOT NULL,
304    source_tables_json       TEXT NOT NULL,
305    dim                      INTEGER NOT NULL,
306    encoding                 TEXT NOT NULL,
307    created_at               TEXT NOT NULL,
308    build_receipt_id         TEXT,
309    artifact_manifest_digest TEXT NOT NULL,
310    status                   TEXT NOT NULL CHECK (status IN ('active', 'superseded', 'invalidated', 'failed')),
311    degradations_json        TEXT NOT NULL DEFAULT '[]'
312);
313
314CREATE INDEX IF NOT EXISTS idx_derived_vector_generations_profile
315ON derived_vector_artifact_generations(codec_family, codec_profile_digest, status, created_at DESC);
316"#;
317
318/// V23 migration: codec governance columns on derived_vector_artifacts.
319/// Tracks governed compression pipeline metadata for turbo-quant-codec integration.
320const MIGRATION_V23: &str = r#"
321ALTER TABLE derived_vector_artifacts ADD COLUMN codec_governance_receipt_id TEXT;
322ALTER TABLE derived_vector_artifacts ADD COLUMN codec_profile TEXT;
323ALTER TABLE derived_vector_artifacts ADD COLUMN degradation_budget REAL;
324ALTER TABLE derived_vector_artifacts ADD COLUMN raw_source_artifact_id TEXT;
325"#;
326
327/// V22 migration: bitemporal columns on episodes table.
328/// Adds valid_time, recorded_time, superseded_by, and fact_digest for append-supersede semantics.
329const MIGRATION_V22: &str = r#"
330ALTER TABLE episodes ADD COLUMN valid_time TEXT;
331ALTER TABLE episodes ADD COLUMN recorded_time TEXT NOT NULL DEFAULT (datetime('now'));
332ALTER TABLE episodes ADD COLUMN superseded_by TEXT;
333ALTER TABLE episodes ADD COLUMN fact_digest TEXT;
334CREATE INDEX IF NOT EXISTS idx_episodes_recorded ON episodes(recorded_time ASC);
335CREATE INDEX IF NOT EXISTS idx_episodes_valid ON episodes(valid_time);
336CREATE INDEX IF NOT EXISTS idx_episodes_superseded ON episodes(superseded_by) WHERE superseded_by IS NOT NULL;
337UPDATE episodes SET recorded_time = updated_at WHERE recorded_time IS NULL OR recorded_time = '';
338"#;
339
340/// Ordered list of migrations.
341#[allow(deprecated)]
342const MIGRATIONS: &[(u32, &str)] = &[
343    (1, MIGRATION_V1),
344    (2, MIGRATION_V2),
345    (3, MIGRATION_V3),
346    (4, MIGRATION_V4),
347    (5, MIGRATION_V5),
348    (6, MIGRATION_V6),
349    (7, MIGRATION_V7),
350    (8, MIGRATION_V8),
351    (9, MIGRATION_V9),
352    (10, crate::projection_import::MIGRATION_V10),
353    (11, crate::projection_storage::MIGRATION_V11),
354    (12, crate::projection_storage::MIGRATION_V12),
355    (13, crate::projection_storage::MIGRATION_V13),
356    (14, crate::projection_storage::MIGRATION_V14),
357    (15, crate::projection_storage::MIGRATION_V15),
358    (16, crate::projection_storage::MIGRATION_V16),
359    (17, crate::projection_storage::MIGRATION_V17),
360    (18, MIGRATION_V18),
361    (19, MIGRATION_V19),
362    (20, MIGRATION_V20),
363    (21, MIGRATION_V21),
364    (22, MIGRATION_V22),
365    (23, MIGRATION_V23),
366];
367
368/// Maximum schema version this build supports.
369pub const MAX_SCHEMA_VERSION: u32 = 23;
370
371/// Procedural migration for V9: rebuild episodes table with episode_id PK.
372fn run_migration_v9(conn: &Connection) -> Result<(), MemoryError> {
373    // Check if episodes table exists (fresh DBs won't have it yet at V6)
374    let episodes_exist: bool = conn
375        .query_row(
376            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='episodes'",
377            [],
378            |row| row.get(0),
379        )
380        .map_err(|e| MemoryError::MigrationFailed {
381            version: 9,
382            reason: format!("existence check failed: {e}"),
383        })?;
384
385    if !episodes_exist {
386        // No episodes table to migrate; create the target schema directly
387        conn.execute_batch(
388            "CREATE TABLE IF NOT EXISTS episode_causes (
389                 episode_id    TEXT NOT NULL,
390                 cause_node_id TEXT NOT NULL,
391                 ordinal       INTEGER NOT NULL DEFAULT 0,
392                 PRIMARY KEY (episode_id, cause_node_id)
393             );
394             CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
395        )?;
396        return Ok(());
397    }
398
399    // Disable foreign keys for table rebuild
400    conn.execute_batch("PRAGMA foreign_keys = OFF;")?;
401
402    conn.execute_batch(
403        "CREATE TABLE episodes_new (
404             episode_id  TEXT PRIMARY KEY,
405             document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
406             cause_ids   TEXT NOT NULL,
407             effect_type TEXT NOT NULL,
408             outcome     TEXT NOT NULL DEFAULT 'pending',
409             confidence  REAL NOT NULL DEFAULT 0.0,
410             verification_status TEXT NOT NULL DEFAULT '{\"status\":\"unverified\"}',
411             experiment_id TEXT,
412             created_at  TEXT NOT NULL DEFAULT (datetime('now')),
413             updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
414             search_text TEXT NOT NULL DEFAULT '',
415             embedding   BLOB,
416             embedding_q8 BLOB,
417             trace_id    TEXT
418         )",
419    )?;
420
421    // Migrate existing data with deterministic episode_id
422    conn.execute_batch(
423        "INSERT INTO episodes_new
424             (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
425              verification_status, experiment_id, created_at, updated_at,
426              search_text, embedding, embedding_q8, trace_id)
427         SELECT
428             document_id || '-ep0',
429             document_id, cause_ids, effect_type, outcome, confidence,
430             verification_status, experiment_id, created_at, updated_at,
431             search_text, embedding, embedding_q8, trace_id
432         FROM episodes",
433    )?;
434
435    conn.execute_batch("DROP TABLE episodes")?;
436    conn.execute_batch("ALTER TABLE episodes_new RENAME TO episodes")?;
437
438    conn.execute_batch(
439        "CREATE INDEX idx_episodes_document_id ON episodes(document_id);
440         CREATE INDEX idx_episodes_effect_type ON episodes(effect_type);
441         CREATE INDEX idx_episodes_outcome ON episodes(outcome);
442         CREATE INDEX idx_episodes_experiment_id ON episodes(experiment_id);",
443    )?;
444
445    // Rebuild episodes_rowid_map with episode_id
446    conn.execute_batch(
447        "DROP TABLE IF EXISTS episodes_rowid_map;
448         CREATE TABLE episodes_rowid_map (
449             rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
450             episode_id  TEXT NOT NULL UNIQUE,
451             document_id TEXT
452         );
453         INSERT INTO episodes_rowid_map (episode_id, document_id)
454         SELECT episode_id, document_id FROM episodes;",
455    )?;
456
457    // Rebuild episodes FTS
458    conn.execute_batch(
459        "DROP TABLE IF EXISTS episodes_fts;
460         CREATE VIRTUAL TABLE episodes_fts USING fts5(
461             content,
462             content='',
463             content_rowid='rowid',
464             tokenize='porter unicode61'
465         );
466         INSERT INTO episodes_fts (rowid, content)
467         SELECT rm.rowid, e.search_text
468         FROM episodes_rowid_map rm
469         JOIN episodes e ON e.episode_id = rm.episode_id;",
470    )?;
471
472    // Normalized causal edge table
473    conn.execute_batch(
474        "CREATE TABLE IF NOT EXISTS episode_causes (
475             episode_id    TEXT NOT NULL,
476             cause_node_id TEXT NOT NULL,
477             ordinal       INTEGER NOT NULL DEFAULT 0,
478             PRIMARY KEY (episode_id, cause_node_id)
479         );
480         CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
481    )?;
482
483    // Populate edge table from existing JSON cause_ids
484    conn.execute_batch(
485        "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
486         SELECT e.episode_id, je.value, CAST(je.key AS INTEGER)
487         FROM episodes e, json_each(e.cause_ids) je;",
488    )?;
489
490    conn.execute_batch("PRAGMA foreign_keys = ON;")?;
491
492    Ok(())
493}
494
495/// How thorough the integrity check should be.
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497pub enum VerifyMode {
498    /// Quick: counts and basic metadata only.
499    Quick,
500    /// Full: includes FTS, JSON/enum decoding, blobs, and SQLite integrity_check.
501    Full,
502}
503
504/// Result of an integrity verification.
505#[derive(Debug, Clone)]
506pub struct IntegrityReport {
507    pub ok: bool,
508    pub schema_version: u32,
509    pub fact_count: usize,
510    pub chunk_count: usize,
511    pub message_count: usize,
512    pub facts_missing_embeddings: usize,
513    pub chunks_missing_embeddings: usize,
514    pub issues: Vec<String>,
515}
516
517/// Action to take when integrity issues are found.
518#[derive(Debug, Clone, Copy, PartialEq, Eq)]
519pub enum ReconcileAction {
520    ReportOnly,
521    RebuildFts,
522    ReEmbed,
523}
524
525/// Desired HNSW sidecar mutation queued in SQLite.
526#[derive(Debug, Clone, Copy, PartialEq, Eq)]
527pub(crate) enum IndexOpKind {
528    Upsert,
529    Delete,
530}
531
532impl IndexOpKind {
533    pub(crate) fn as_str(self) -> &'static str {
534        match self {
535            Self::Upsert => "upsert",
536            Self::Delete => "delete",
537        }
538    }
539
540    fn parse(raw: &str, item_key: &str) -> Result<Self, MemoryError> {
541        match raw {
542            "upsert" => Ok(Self::Upsert),
543            "delete" => Ok(Self::Delete),
544            other => Err(MemoryError::CorruptData {
545                table: "pending_index_ops",
546                row_id: item_key.to_string(),
547                detail: format!("invalid op_kind '{other}'"),
548            }),
549        }
550    }
551}
552
553/// Durable sidecar repair record.
554#[derive(Debug, Clone)]
555pub(crate) struct PendingIndexOp {
556    pub item_key: String,
557    pub entity_type: String,
558    pub op_kind: IndexOpKind,
559    pub attempt_count: u32,
560    pub last_error: Option<String>,
561}
562
563/// Run a closure inside an unchecked transaction, committing on success.
564pub fn with_transaction<F, T>(conn: &Connection, f: F) -> Result<T, MemoryError>
565where
566    F: FnOnce(&rusqlite::Transaction<'_>) -> Result<T, MemoryError>,
567{
568    let tx = conn.unchecked_transaction()?;
569    let result = f(&tx)?;
570    tx.commit()?;
571    Ok(result)
572}
573
574/// Open or create a SQLite database, configure pragmas, and run migrations.
575pub fn open_database(
576    path: &Path,
577    pool: &PoolConfig,
578    limits: &MemoryLimits,
579) -> Result<Connection, MemoryError> {
580    open_database_internal(path, pool, limits.max_db_size_bytes, true)
581}
582
583/// Open a SQLite connection with pragmas applied but without running migrations.
584pub fn open_database_connection(
585    path: &Path,
586    pool: &PoolConfig,
587    limits: &MemoryLimits,
588) -> Result<Connection, MemoryError> {
589    open_database_internal(path, pool, limits.max_db_size_bytes, false)
590}
591
592pub(crate) fn open_database_internal(
593    path: &Path,
594    pool: &PoolConfig,
595    max_db_size_bytes: u64,
596    run_schema_migrations: bool,
597) -> Result<Connection, MemoryError> {
598    create_parent_dirs(path)?;
599    let conn = Connection::open(path)?;
600    configure_connection(&conn, path, pool, max_db_size_bytes, false)?;
601    if run_schema_migrations {
602        run_migrations(&conn)?;
603    }
604    Ok(conn)
605}
606
607pub(crate) fn open_pool_member_connection(
608    path: &Path,
609    pool: &PoolConfig,
610    limits: &MemoryLimits,
611    query_only: bool,
612) -> Result<Connection, MemoryError> {
613    create_parent_dirs(path)?;
614    let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE;
615    let conn = Connection::open_with_flags(path, flags)?;
616    configure_connection(&conn, path, pool, limits.max_db_size_bytes, query_only)?;
617    Ok(conn)
618}
619
620fn create_parent_dirs(path: &Path) -> Result<(), MemoryError> {
621    if let Some(parent) = path.parent() {
622        if !parent.as_os_str().is_empty() {
623            std::fs::create_dir_all(parent).map_err(|e| {
624                MemoryError::StorageError(format!(
625                    "failed to create database directory {}: {}",
626                    parent.display(),
627                    e
628                ))
629            })?;
630        }
631    }
632    Ok(())
633}
634
635fn configure_connection(
636    conn: &Connection,
637    path: &Path,
638    pool: &PoolConfig,
639    max_db_size_bytes: u64,
640    query_only: bool,
641) -> Result<(), MemoryError> {
642    let journal_mode = if pool.enable_wal { "WAL" } else { "DELETE" };
643    conn.execute_batch(&format!(
644        "PRAGMA journal_mode = {};
645         PRAGMA foreign_keys = ON;
646         PRAGMA busy_timeout = {};
647         PRAGMA synchronous = NORMAL;
648         PRAGMA temp_store = MEMORY;
649         PRAGMA wal_autocheckpoint = {};",
650        journal_mode, pool.busy_timeout_ms, pool.wal_autocheckpoint,
651    ))?;
652
653    if query_only {
654        conn.execute_batch("PRAGMA query_only = ON;")?;
655    }
656
657    let actual_journal_mode: String =
658        conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
659    let expected_journal_mode = if pool.enable_wal { "wal" } else { "delete" };
660    if actual_journal_mode.to_lowercase() != expected_journal_mode {
661        return Err(MemoryError::StorageError(format!(
662            "SQLite journal mode mismatch for {}: requested {}, got {}",
663            path.display(),
664            expected_journal_mode,
665            actual_journal_mode
666        )));
667    }
668
669    if max_db_size_bytes > 0 {
670        let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
671        let max_page_count = max_db_size_bytes.div_ceil(page_size);
672
673        // SM-AUD-0065: Validate max_page_count before setting pragma
674        const MAX_SQLITE_PAGE_COUNT: u64 = 1_073_741_823; // SQLite hard limit
675        const MIN_SQLITE_PAGE_COUNT: u64 = 1;
676        if !(MIN_SQLITE_PAGE_COUNT..=MAX_SQLITE_PAGE_COUNT).contains(&max_page_count) {
677            return Err(MemoryError::StorageError(format!(
678                "Invalid max_page_count {}: must be between {} and {}",
679                max_page_count, MIN_SQLITE_PAGE_COUNT, MAX_SQLITE_PAGE_COUNT
680            )));
681        }
682
683        let actual_max_page_count: u64 = conn.query_row(
684            &format!("PRAGMA max_page_count = {}", max_page_count),
685            [],
686            |row| row.get(0),
687        )?;
688        let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
689
690        if page_count > actual_max_page_count {
691            return Err(MemoryError::DatabaseSizeLimitExceeded {
692                current: page_count.saturating_mul(page_size),
693                limit: max_db_size_bytes,
694            });
695        }
696    }
697
698    // SM-AUD-0064: Assert foreign_keys is ON after configuration
699    let foreign_keys_enabled: bool = conn.query_row("PRAGMA foreign_keys", [], |row| row.get(0))?;
700    if !foreign_keys_enabled {
701        return Err(MemoryError::StorageError(
702            "PRAGMA foreign_keys failed to enable after configuration".to_string(),
703        ));
704    }
705
706    Ok(())
707}
708
709/// Run all pending migrations.
710pub fn run_migrations(conn: &Connection) -> Result<(), MemoryError> {
711    let user_version: u32 = conn
712        .query_row("PRAGMA user_version", [], |row| row.get(0))
713        .map_err(|e| MemoryError::MigrationFailed {
714            version: 0,
715            reason: format!("failed to read PRAGMA user_version: {e}"),
716        })?;
717
718    if user_version > MAX_SCHEMA_VERSION {
719        return Err(MemoryError::SchemaAhead {
720            found: user_version,
721            supported: MAX_SCHEMA_VERSION,
722        });
723    }
724
725    conn.execute_batch(
726        "CREATE TABLE IF NOT EXISTS _schema_version (
727            version     INTEGER PRIMARY KEY,
728            applied_at  TEXT NOT NULL DEFAULT (datetime('now'))
729        );",
730    )?;
731
732    for &(version, sql) in MIGRATIONS {
733        let current_version: u32 = conn
734            .query_row(
735                "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
736                [],
737                |row| row.get(0),
738            )
739            .unwrap_or(0);
740
741        if current_version >= version {
742            continue;
743        }
744
745        with_transaction(conn, |tx| {
746            match version {
747                9 => run_migration_v9(tx).map_err(|e| MemoryError::MigrationFailed {
748                    version,
749                    reason: e.to_string(),
750                })?,
751                16 => run_migration_v16(tx).map_err(|e| MemoryError::MigrationFailed {
752                    version,
753                    reason: e.to_string(),
754                })?,
755                17 => run_migration_v17(tx).map_err(|e| MemoryError::MigrationFailed {
756                    version,
757                    reason: e.to_string(),
758                })?,
759                20 => run_migration_v20(tx).map_err(|e| MemoryError::MigrationFailed {
760                    version,
761                    reason: e.to_string(),
762                })?,
763                21 => run_migration_v21(tx).map_err(|e| MemoryError::MigrationFailed {
764                    version,
765                    reason: e.to_string(),
766                })?,
767                _ => tx
768                    .execute_batch(sql)
769                    .map_err(|e| MemoryError::MigrationFailed {
770                        version,
771                        reason: e.to_string(),
772                    })?,
773            }
774            tx.execute(
775                "INSERT INTO _schema_version (version) VALUES (?1)",
776                params![version],
777            )
778            .map_err(|e| MemoryError::MigrationFailed {
779                version,
780                reason: e.to_string(),
781            })?;
782            Ok(())
783        })?;
784
785        tracing::info!("Applied migration V{}", version);
786    }
787
788    let final_version: u32 = conn
789        .query_row(
790            "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
791            [],
792            |row| row.get(0),
793        )
794        .unwrap_or(0);
795    conn.execute_batch(&format!("PRAGMA user_version = {};", final_version))?;
796
797    Ok(())
798}
799
800fn run_migration_v16(conn: &Connection) -> Result<(), rusqlite::Error> {
801    add_column_if_missing(conn, "projection_import_log", "kernel_payload_json", "TEXT")?;
802    add_column_if_missing(
803        conn,
804        "projection_import_failures",
805        "kernel_payload_json",
806        "TEXT",
807    )?;
808    Ok(())
809}
810
811fn run_migration_v17(conn: &Connection) -> Result<(), rusqlite::Error> {
812    add_column_if_missing(conn, "projection_import_log", "episode_bundle_id", "TEXT")?;
813    add_column_if_missing(conn, "projection_import_log", "episode_bundle_json", "TEXT")?;
814    add_column_if_missing(
815        conn,
816        "projection_import_log",
817        "execution_context_json",
818        "TEXT",
819    )?;
820    add_column_if_missing(
821        conn,
822        "projection_import_failures",
823        "episode_bundle_id",
824        "TEXT",
825    )?;
826    add_column_if_missing(
827        conn,
828        "projection_import_failures",
829        "episode_bundle_json",
830        "TEXT",
831    )?;
832    add_column_if_missing(
833        conn,
834        "projection_import_failures",
835        "execution_context_json",
836        "TEXT",
837    )?;
838    Ok(())
839}
840
841fn run_migration_v20(conn: &Connection) -> Result<(), rusqlite::Error> {
842    add_column_if_missing(conn, "derived_vector_artifacts", "encoded_digest", "TEXT")?;
843    conn.execute(
844        "UPDATE derived_vector_artifacts
845         SET encoded_digest = artifact_digest
846         WHERE encoded_digest IS NULL OR encoded_digest = ''",
847        [],
848    )?;
849    add_column_if_missing(
850        conn,
851        "derived_vector_artifacts",
852        "encoding",
853        "TEXT NOT NULL DEFAULT 'turbo_code_wire_v1'",
854    )?;
855    add_column_if_missing(
856        conn,
857        "derived_vector_artifacts",
858        "dim",
859        "INTEGER NOT NULL DEFAULT 0",
860    )?;
861    add_column_if_missing(
862        conn,
863        "derived_vector_artifacts",
864        "status",
865        "TEXT NOT NULL DEFAULT 'active'",
866    )?;
867    conn.execute_batch(
868        "CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_profile
869         ON derived_vector_artifacts(codec_family, codec_profile_digest, status);
870         CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_source_digest
871         ON derived_vector_artifacts(source_embedding_digest);",
872    )?;
873    Ok(())
874}
875
876fn run_migration_v21(conn: &Connection) -> Result<(), rusqlite::Error> {
877    conn.execute_batch(MIGRATION_V21)?;
878    add_column_if_missing(conn, "derived_vector_artifacts", "generation_id", "TEXT")?;
879    conn.execute_batch(
880        "CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_generation
881         ON derived_vector_artifacts(generation_id, status);",
882    )?;
883    Ok(())
884}
885
886const SEARCH_RECEIPT_SCHEMA_VERSION: &str = "vector_search_receipt_v1";
887
888#[derive(Debug, Serialize, Deserialize)]
889struct StoredVectorSearchReceiptV1 {
890    #[serde(default = "default_search_receipt_schema_version")]
891    schema_version: String,
892    receipt_id: String,
893    evaluation_time: DateTime<Utc>,
894    #[serde(default)]
895    receipt_digest: Option<String>,
896    #[serde(default)]
897    trace_id: Option<String>,
898    #[serde(default)]
899    attempt_family_id: Option<String>,
900    #[serde(default)]
901    attempt_id: Option<String>,
902    #[serde(default)]
903    replay_of: Option<String>,
904    query_embedding_digest: Option<String>,
905    #[serde(default)]
906    query_text_digest: Option<String>,
907    #[serde(default)]
908    query_input_digest: Option<String>,
909    #[serde(default)]
910    filter_digest: Option<String>,
911    #[serde(default)]
912    redaction_state: Option<String>,
913    #[serde(default)]
914    budget_id: Option<String>,
915    #[serde(default)]
916    deadline_at: Option<DateTime<Utc>>,
917    search_profile: String,
918    candidate_backend: String,
919    codec_family: Option<String>,
920    codec_profile_digest: Option<String>,
921    #[serde(default)]
922    artifact_profile_digest: Option<String>,
923    #[serde(default)]
924    artifact_count: Option<u64>,
925    #[serde(default)]
926    artifact_corruption_count: Option<u64>,
927    #[serde(default)]
928    artifact_missing_count: Option<u64>,
929    #[serde(default)]
930    vector_artifact_manifest_digest: Option<String>,
931    #[serde(default)]
932    artifact_generation_id: Option<String>,
933    #[serde(default)]
934    approximate_scanned_count: Option<u64>,
935    #[serde(default)]
936    approximate_returned_count: Option<u64>,
937    #[serde(default)]
938    raw_rows_loaded_count: Option<u64>,
939    #[serde(default)]
940    filter_strategy: Option<String>,
941    #[serde(default)]
942    vector_artifact_count: Option<u64>,
943    #[serde(default)]
944    vector_artifact_missing_count: Option<u64>,
945    #[serde(default)]
946    vector_artifact_stale_count: Option<u64>,
947    #[serde(default)]
948    exact_rerank_count: Option<u64>,
949    #[serde(default)]
950    approximate_candidate_count: Option<u64>,
951    #[serde(default)]
952    fallback_reason: Option<String>,
953    approximate: bool,
954    requested_candidates: u64,
955    returned_candidates: u64,
956    post_filter_candidates: u64,
957    fallback: Option<String>,
958    exact_rerank: bool,
959    result_ids: Vec<String>,
960    degradations: Vec<String>,
961}
962
963fn default_search_receipt_schema_version() -> String {
964    SEARCH_RECEIPT_SCHEMA_VERSION.to_string()
965}
966
967fn b3_digest(bytes: &[u8]) -> String {
968    format!("blake3:{}", ContentDigest::compute(bytes).hex())
969}
970
971/// Row from the derived vector artifact store.
972#[cfg(feature = "turbo-quant-codec")]
973#[derive(Debug, Clone)]
974pub(crate) struct DerivedVectorArtifactRow {
975    pub item_key: String,
976    pub generation_id: Option<String>,
977    pub codec_family: String,
978    pub codec_profile_digest: String,
979    pub source_embedding_digest: String,
980    pub encoded_digest: String,
981    pub encoding: String,
982    pub dim: usize,
983    pub status: String,
984    pub encoded: Vec<u8>,
985    // Codec governance columns (V23 migration)
986    pub codec_governance_receipt_id: Option<String>,
987    pub codec_profile: Option<String>,
988    pub degradation_budget: Option<f64>,
989    pub raw_source_artifact_id: Option<String>,
990}
991
992/// Active derived vector artifact generation row.
993#[cfg(feature = "turbo-quant-codec")]
994#[derive(Debug, Clone)]
995#[allow(dead_code)]
996pub(crate) struct DerivedVectorArtifactGenerationRow {
997    pub generation_id: String,
998    pub codec_family: String,
999    pub codec_profile_digest: String,
1000    pub source_snapshot_digest: String,
1001    pub source_row_count: usize,
1002    pub artifact_count: usize,
1003    pub dim: usize,
1004    pub encoding: String,
1005    pub artifact_manifest_digest: String,
1006    pub status: String,
1007}
1008
1009/// Stable digest for an authoritative raw f32 embedding BLOB.
1010#[cfg(feature = "turbo-quant-codec")]
1011pub(crate) fn source_embedding_digest(
1012    blob: &[u8],
1013    expected_dim: usize,
1014) -> Result<String, MemoryError> {
1015    validate_vector_blob_len(blob, expected_dim)?;
1016    let mut builder = DigestBuilder::new();
1017    builder
1018        .update_str("semantic-memory.source_embedding.v1")
1019        .separator()
1020        .update(&(expected_dim as u64).to_le_bytes())
1021        .separator()
1022        .update(blob);
1023    Ok(format!("blake3:{}", builder.finalize().hex()))
1024}
1025
1026#[cfg(feature = "turbo-quant-codec")]
1027fn source_snapshot_digest(rows: &[DerivedVectorArtifactRow], dim: usize) -> String {
1028    let mut entries = rows
1029        .iter()
1030        .map(|row| (row.item_key.as_str(), row.source_embedding_digest.as_str()))
1031        .collect::<Vec<_>>();
1032    entries.sort_unstable();
1033
1034    let mut builder = DigestBuilder::new();
1035    builder
1036        .update_str("semantic-memory.vector_source_snapshot.v1")
1037        .separator()
1038        .update(&(dim as u64).to_le_bytes())
1039        .separator();
1040    for (item_key, source_embedding_digest) in entries {
1041        builder
1042            .update_str(item_key)
1043            .separator()
1044            .update_str(source_embedding_digest)
1045            .separator();
1046    }
1047    format!("blake3:{}", builder.finalize().hex())
1048}
1049
1050#[cfg(feature = "turbo-quant-codec")]
1051pub(crate) fn current_source_snapshot_digest(
1052    conn: &Connection,
1053    dim: usize,
1054) -> Result<(String, usize), MemoryError> {
1055    let mut stmt = conn.prepare(
1056        "SELECT 'fact:' || id AS item_key, embedding FROM facts WHERE embedding IS NOT NULL
1057         UNION ALL
1058         SELECT 'chunk:' || id AS item_key, embedding FROM chunks WHERE embedding IS NOT NULL
1059         UNION ALL
1060         SELECT 'msg:' || id AS item_key, embedding FROM messages WHERE embedding IS NOT NULL
1061         UNION ALL
1062         SELECT 'episode:' || episode_id AS item_key, embedding FROM episodes WHERE embedding IS NOT NULL",
1063    )?;
1064    let rows = stmt.query_map([], |row| {
1065        Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1066    })?;
1067
1068    let mut entries = Vec::new();
1069    for row in rows {
1070        let (item_key, blob) = row?;
1071        entries.push((item_key, source_embedding_digest(&blob, dim)?));
1072    }
1073    entries.sort_unstable();
1074
1075    let mut builder = DigestBuilder::new();
1076    builder
1077        .update_str("semantic-memory.vector_source_snapshot.v1")
1078        .separator()
1079        .update(&(dim as u64).to_le_bytes())
1080        .separator();
1081    for (item_key, source_embedding_digest) in &entries {
1082        builder
1083            .update_str(item_key)
1084            .separator()
1085            .update_str(source_embedding_digest)
1086            .separator();
1087    }
1088    Ok((
1089        format!("blake3:{}", builder.finalize().hex()),
1090        entries.len(),
1091    ))
1092}
1093
1094#[cfg(feature = "turbo-quant-codec")]
1095fn derived_artifact_manifest_digest(rows: &[DerivedVectorArtifactRow]) -> String {
1096    let mut entries = rows
1097        .iter()
1098        .map(|row| {
1099            (
1100                row.item_key.as_str(),
1101                row.source_embedding_digest.as_str(),
1102                row.encoded_digest.as_str(),
1103            )
1104        })
1105        .collect::<Vec<_>>();
1106    entries.sort_unstable();
1107
1108    let mut builder = DigestBuilder::new();
1109    builder
1110        .update_str("semantic-memory.vector_artifact_manifest.v1")
1111        .separator();
1112    for (item_key, source_embedding_digest, encoded_digest) in entries {
1113        builder
1114            .update_str(item_key)
1115            .separator()
1116            .update_str(source_embedding_digest)
1117            .separator()
1118            .update_str(encoded_digest)
1119            .separator();
1120    }
1121    format!("blake3:{}", builder.finalize().hex())
1122}
1123
1124#[cfg(feature = "turbo-quant-codec")]
1125pub(crate) fn upsert_derived_vector_artifact(
1126    conn: &Connection,
1127    row: &DerivedVectorArtifactRow,
1128) -> Result<(), MemoryError> {
1129    conn.execute(
1130        "INSERT OR REPLACE INTO derived_vector_artifacts
1131             (item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1132              encoded_digest, artifact_digest, encoding, dim, encoded, created_at, status,
1133              codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id)
1134         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, ?8, ?9, datetime('now'), ?10, ?11, ?12, ?13, ?14)",
1135        params![
1136            row.item_key,
1137            row.generation_id.as_deref(),
1138            row.codec_family,
1139            row.codec_profile_digest,
1140            row.source_embedding_digest,
1141            row.encoded_digest,
1142            row.encoding,
1143            i64::try_from(row.dim)
1144                .map_err(|err| MemoryError::Other(format!("artifact dim overflow: {err}")))?,
1145            row.encoded,
1146            row.status,
1147            row.codec_governance_receipt_id.as_deref(),
1148            row.codec_profile.as_deref(),
1149            row.degradation_budget,
1150            row.raw_source_artifact_id.as_deref(),
1151        ],
1152    )?;
1153    Ok(())
1154}
1155
1156pub fn delete_derived_vector_artifact(
1157    conn: &Connection,
1158    item_key: &str,
1159) -> Result<(), MemoryError> {
1160    conn.execute(
1161        "DELETE FROM derived_vector_artifacts WHERE item_key = ?1",
1162        params![item_key],
1163    )?;
1164    Ok(())
1165}
1166
1167pub fn invalidate_derived_vector_artifact(
1168    conn: &Connection,
1169    item_key: &str,
1170) -> Result<(), MemoryError> {
1171    conn.execute(
1172        "UPDATE derived_vector_artifacts
1173         SET status = 'invalidated'
1174         WHERE item_key = ?1 AND status = 'active'",
1175        params![item_key],
1176    )?;
1177    conn.execute(
1178        "UPDATE derived_vector_artifact_generations
1179         SET status = 'invalidated'
1180         WHERE status = 'active'",
1181        [],
1182    )?;
1183    Ok(())
1184}
1185
1186#[cfg(feature = "turbo-quant-codec")]
1187#[allow(dead_code)]
1188pub(crate) fn load_derived_vector_artifacts_by_profile(
1189    conn: &Connection,
1190    codec_family: &str,
1191    codec_profile_digest: &str,
1192) -> Result<Vec<DerivedVectorArtifactRow>, MemoryError> {
1193    let mut stmt = conn.prepare(
1194        "SELECT item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1195                encoded_digest, encoding, dim, status, encoded,
1196                codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id
1197         FROM derived_vector_artifacts
1198         WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1199    )?;
1200    let rows = stmt.query_map(params![codec_family, codec_profile_digest], |row| {
1201        let dim_i64: i64 = row.get(7)?;
1202        Ok(DerivedVectorArtifactRow {
1203            item_key: row.get(0)?,
1204            generation_id: row.get(1)?,
1205            codec_family: row.get(2)?,
1206            codec_profile_digest: row.get(3)?,
1207            source_embedding_digest: row.get(4)?,
1208            encoded_digest: row.get(5)?,
1209            encoding: row.get(6)?,
1210            dim: usize::try_from(dim_i64).map_err(|err| {
1211                rusqlite::Error::FromSqlConversionFailure(
1212                    7,
1213                    rusqlite::types::Type::Integer,
1214                    Box::new(err),
1215                )
1216            })?,
1217            status: row.get(8)?,
1218            encoded: row.get(9)?,
1219            codec_governance_receipt_id: row.get(10)?,
1220            codec_profile: row.get(11)?,
1221            degradation_budget: row.get(12)?,
1222            raw_source_artifact_id: row.get(13)?,
1223        })
1224    })?;
1225
1226    let mut artifacts = Vec::new();
1227    for row in rows {
1228        artifacts.push(row?);
1229    }
1230    Ok(artifacts)
1231}
1232
1233#[cfg(feature = "turbo-quant-codec")]
1234pub(crate) fn load_derived_vector_artifacts_by_generation(
1235    conn: &Connection,
1236    generation_id: &str,
1237) -> Result<Vec<DerivedVectorArtifactRow>, MemoryError> {
1238    let mut stmt = conn.prepare(
1239        "SELECT item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1240                encoded_digest, encoding, dim, status, encoded,
1241                codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id
1242         FROM derived_vector_artifacts
1243         WHERE generation_id = ?1 AND status = 'active'",
1244    )?;
1245    let rows = stmt.query_map(params![generation_id], |row| {
1246        let dim_i64: i64 = row.get(7)?;
1247        Ok(DerivedVectorArtifactRow {
1248            item_key: row.get(0)?,
1249            generation_id: row.get(1)?,
1250            codec_family: row.get(2)?,
1251            codec_profile_digest: row.get(3)?,
1252            source_embedding_digest: row.get(4)?,
1253            encoded_digest: row.get(5)?,
1254            encoding: row.get(6)?,
1255            dim: usize::try_from(dim_i64).map_err(|err| {
1256                rusqlite::Error::FromSqlConversionFailure(
1257                    7,
1258                    rusqlite::types::Type::Integer,
1259                    Box::new(err),
1260                )
1261            })?,
1262            status: row.get(8)?,
1263            encoded: row.get(9)?,
1264            codec_governance_receipt_id: row.get(10)?,
1265            codec_profile: row.get(11)?,
1266            degradation_budget: row.get(12)?,
1267            raw_source_artifact_id: row.get(13)?,
1268        })
1269    })?;
1270
1271    let mut artifacts = Vec::new();
1272    for row in rows {
1273        artifacts.push(row?);
1274    }
1275    Ok(artifacts)
1276}
1277
1278#[cfg(feature = "turbo-quant-codec")]
1279pub(crate) fn current_derived_vector_generation(
1280    conn: &Connection,
1281    codec_family: &str,
1282    codec_profile_digest: &str,
1283) -> Result<Option<DerivedVectorArtifactGenerationRow>, MemoryError> {
1284    conn.query_row(
1285        "SELECT generation_id, codec_family, codec_profile_digest, source_snapshot_digest,
1286                source_row_count, artifact_count, dim, encoding, artifact_manifest_digest, status
1287         FROM derived_vector_artifact_generations
1288         WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'
1289         ORDER BY created_at DESC
1290         LIMIT 1",
1291        params![codec_family, codec_profile_digest],
1292        |row| {
1293            let source_row_count: i64 = row.get(4)?;
1294            let artifact_count: i64 = row.get(5)?;
1295            let dim: i64 = row.get(6)?;
1296            Ok(DerivedVectorArtifactGenerationRow {
1297                generation_id: row.get(0)?,
1298                codec_family: row.get(1)?,
1299                codec_profile_digest: row.get(2)?,
1300                source_snapshot_digest: row.get(3)?,
1301                source_row_count: usize::try_from(source_row_count).map_err(|err| {
1302                    rusqlite::Error::FromSqlConversionFailure(
1303                        4,
1304                        rusqlite::types::Type::Integer,
1305                        Box::new(err),
1306                    )
1307                })?,
1308                artifact_count: usize::try_from(artifact_count).map_err(|err| {
1309                    rusqlite::Error::FromSqlConversionFailure(
1310                        5,
1311                        rusqlite::types::Type::Integer,
1312                        Box::new(err),
1313                    )
1314                })?,
1315                dim: usize::try_from(dim).map_err(|err| {
1316                    rusqlite::Error::FromSqlConversionFailure(
1317                        6,
1318                        rusqlite::types::Type::Integer,
1319                        Box::new(err),
1320                    )
1321                })?,
1322                encoding: row.get(7)?,
1323                artifact_manifest_digest: row.get(8)?,
1324                status: row.get(9)?,
1325            })
1326        },
1327    )
1328    .optional()
1329    .map_err(MemoryError::from)
1330}
1331
1332pub fn count_derived_vector_artifacts(
1333    conn: &Connection,
1334    codec_family: &str,
1335    codec_profile_digest: &str,
1336) -> Result<usize, MemoryError> {
1337    let count: i64 = conn.query_row(
1338        "SELECT COUNT(*) FROM derived_vector_artifacts
1339         WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1340        params![codec_family, codec_profile_digest],
1341        |row| row.get(0),
1342    )?;
1343    usize::try_from(count)
1344        .map_err(|err| MemoryError::Other(format!("derived artifact count overflow: {err}")))
1345}
1346
1347#[cfg(feature = "turbo-quant-codec")]
1348pub(crate) fn rebuild_turbo_quant_artifacts(
1349    conn: &Connection,
1350    dim: usize,
1351    bits: u8,
1352    projections: usize,
1353    seed: u64,
1354) -> Result<VectorArtifactBuildReceiptV1, MemoryError> {
1355    use crate::vector_codec::{TurboQuantCodec, VectorCodec};
1356
1357    let started = std::time::Instant::now();
1358    let codec = TurboQuantCodec::new(dim, bits, projections, seed)?;
1359    let codec_profile_digest = codec.profile().digest();
1360    let generation_id = uuid::Uuid::new_v4().to_string();
1361    let mut source_row_count = 0usize;
1362    let mut artifact_count = 0usize;
1363    let mut skipped_row_count = 0usize;
1364    let mut degradations = Vec::new();
1365
1366    let mut stmt = conn.prepare(
1367        "SELECT 'fact:' || id AS item_key, embedding FROM facts WHERE embedding IS NOT NULL
1368         UNION ALL
1369         SELECT 'chunk:' || id AS item_key, embedding FROM chunks WHERE embedding IS NOT NULL
1370         UNION ALL
1371         SELECT 'msg:' || id AS item_key, embedding FROM messages WHERE embedding IS NOT NULL
1372         UNION ALL
1373         SELECT 'episode:' || episode_id AS item_key, embedding FROM episodes WHERE embedding IS NOT NULL",
1374    )?;
1375    let rows = stmt.query_map([], |row| {
1376        Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1377    })?;
1378
1379    let mut pending = Vec::new();
1380    for row in rows {
1381        let (item_key, blob) = row?;
1382        source_row_count += 1;
1383        let embedding = match decode_f32_le(&blob, dim) {
1384            Ok(embedding) => embedding,
1385            Err(err) => {
1386                skipped_row_count += 1;
1387                degradations.push(format!(
1388                    "skipped {item_key}: invalid authoritative embedding: {err}"
1389                ));
1390                continue;
1391            }
1392        };
1393        let artifact = match codec.encode(&embedding) {
1394            Ok(artifact) => artifact,
1395            Err(err) => {
1396                skipped_row_count += 1;
1397                degradations.push(format!("skipped {item_key}: encode failed: {err}"));
1398                continue;
1399            }
1400        };
1401        pending.push(DerivedVectorArtifactRow {
1402            item_key,
1403            generation_id: Some(generation_id.clone()),
1404            codec_family: "turbo_quant".to_string(),
1405            codec_profile_digest: codec_profile_digest.clone(),
1406            source_embedding_digest: source_embedding_digest(&blob, dim)?,
1407            encoded_digest: artifact.artifact_digest,
1408            encoding: "turbo_code_wire_v1".to_string(),
1409            dim,
1410            status: "active".to_string(),
1411            encoded: artifact.encoded,
1412            // V23 governance columns — populated by encode_governed path; existing
1413            // turbo-quant build path leaves these as None (nullable).
1414            codec_governance_receipt_id: None,
1415            codec_profile: None,
1416            degradation_budget: None,
1417            raw_source_artifact_id: None,
1418        });
1419    }
1420    drop(stmt);
1421
1422    let build_receipt_id = uuid::Uuid::new_v4().to_string();
1423    let source_snapshot_digest = source_snapshot_digest(&pending, dim);
1424    let artifact_manifest_digest = derived_artifact_manifest_digest(&pending);
1425    let source_tables = vec![
1426        "facts".to_string(),
1427        "chunks".to_string(),
1428        "messages".to_string(),
1429        "episodes".to_string(),
1430    ];
1431    let generation_manifest = DerivedVectorArtifactGenerationV1 {
1432        schema_version: "derived_vector_artifact_generation_v1".to_string(),
1433        generation_id: generation_id.clone(),
1434        codec_family: "turbo_quant".to_string(),
1435        codec_profile_digest: codec_profile_digest.clone(),
1436        source_snapshot_digest: source_snapshot_digest.clone(),
1437        source_row_count,
1438        artifact_count: pending.len(),
1439        source_tables,
1440        dim,
1441        encoding: "turbo_code_wire_v1".to_string(),
1442        created_at: Utc::now(),
1443        build_receipt_id: Some(build_receipt_id.clone()),
1444        artifact_manifest_digest: artifact_manifest_digest.clone(),
1445        status: if skipped_row_count == 0 {
1446            "active".to_string()
1447        } else {
1448            "failed".to_string()
1449        },
1450        degradations: degradations.clone(),
1451    };
1452
1453    with_transaction(conn, |tx| {
1454        tx.execute(
1455            "UPDATE derived_vector_artifact_generations
1456             SET status = 'superseded'
1457             WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1458            params!["turbo_quant", &codec_profile_digest],
1459        )?;
1460        tx.execute(
1461            "DELETE FROM derived_vector_artifacts
1462             WHERE codec_family = ?1 AND codec_profile_digest = ?2",
1463            params!["turbo_quant", &codec_profile_digest],
1464        )?;
1465        tx.execute(
1466            "INSERT INTO derived_vector_artifact_generations
1467                (generation_id, schema_version, codec_family, codec_profile_digest,
1468                 source_snapshot_digest, source_row_count, artifact_count, source_tables_json,
1469                 dim, encoding, created_at, build_receipt_id, artifact_manifest_digest,
1470                 status, degradations_json)
1471             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
1472            params![
1473                generation_manifest.generation_id,
1474                generation_manifest.schema_version,
1475                generation_manifest.codec_family,
1476                generation_manifest.codec_profile_digest,
1477                generation_manifest.source_snapshot_digest,
1478                i64::try_from(generation_manifest.source_row_count).map_err(|err| {
1479                    MemoryError::Other(format!("source row count overflow: {err}"))
1480                })?,
1481                i64::try_from(generation_manifest.artifact_count).map_err(|err| {
1482                    MemoryError::Other(format!("artifact count overflow: {err}"))
1483                })?,
1484                serde_json::to_string(&generation_manifest.source_tables)
1485                    .map_err(|err| MemoryError::Other(err.to_string()))?,
1486                i64::try_from(generation_manifest.dim)
1487                    .map_err(|err| MemoryError::Other(format!("artifact dim overflow: {err}")))?,
1488                generation_manifest.encoding,
1489                generation_manifest.created_at.to_rfc3339(),
1490                generation_manifest.build_receipt_id,
1491                generation_manifest.artifact_manifest_digest,
1492                generation_manifest.status,
1493                serde_json::to_string(&generation_manifest.degradations)
1494                    .map_err(|err| MemoryError::Other(err.to_string()))?,
1495            ],
1496        )?;
1497        for row in &pending {
1498            upsert_derived_vector_artifact(tx, row)?;
1499            artifact_count += 1;
1500        }
1501        Ok(())
1502    })?;
1503
1504    Ok(VectorArtifactBuildReceiptV1 {
1505        schema_version: "vector_artifact_build_receipt_v1".to_string(),
1506        codec_family: "turbo_quant".to_string(),
1507        codec_profile_digest,
1508        source_row_count,
1509        artifact_count,
1510        generation_id: Some(generation_id),
1511        source_snapshot_digest: Some(source_snapshot_digest),
1512        artifact_manifest_digest: Some(artifact_manifest_digest),
1513        build_receipt_id: Some(build_receipt_id),
1514        skipped_row_count,
1515        elapsed_ms: started.elapsed().as_millis(),
1516        created_at: Utc::now(),
1517        degradations,
1518    })
1519}
1520
1521fn receipt_count_to_u64(value: usize, field: &'static str) -> Result<u64, MemoryError> {
1522    u64::try_from(value).map_err(|err| MemoryError::Other(format!("{field} is too large: {err}")))
1523}
1524
1525fn receipt_count_to_i64(value: u64, field: &'static str) -> Result<i64, MemoryError> {
1526    i64::try_from(value).map_err(|err| MemoryError::Other(format!("{field} is too large: {err}")))
1527}
1528
1529fn receipt_count_to_usize(
1530    value: u64,
1531    receipt_id: &str,
1532    field: &'static str,
1533) -> Result<usize, MemoryError> {
1534    usize::try_from(value).map_err(|err| MemoryError::CorruptData {
1535        table: "search_receipts",
1536        row_id: receipt_id.to_string(),
1537        detail: format!("{field} does not fit this platform: {err}"),
1538    })
1539}
1540
1541fn stored_search_receipt(
1542    receipt: &VectorSearchReceiptV1,
1543) -> Result<StoredVectorSearchReceiptV1, MemoryError> {
1544    Ok(StoredVectorSearchReceiptV1 {
1545        schema_version: SEARCH_RECEIPT_SCHEMA_VERSION.to_string(),
1546        receipt_id: receipt.receipt_id.clone(),
1547        evaluation_time: receipt.evaluation_time,
1548        receipt_digest: receipt.receipt_digest.clone(),
1549        trace_id: receipt.trace_id.clone(),
1550        attempt_family_id: receipt.attempt_family_id.clone(),
1551        attempt_id: receipt.attempt_id.clone(),
1552        replay_of: receipt.replay_of.clone(),
1553        query_embedding_digest: receipt.query_embedding_digest.clone(),
1554        query_text_digest: receipt.query_text_digest.clone(),
1555        query_input_digest: receipt.query_input_digest.clone(),
1556        filter_digest: receipt.filter_digest.clone(),
1557        redaction_state: receipt.redaction_state.clone(),
1558        budget_id: receipt.budget_id.clone(),
1559        deadline_at: receipt.deadline_at,
1560        search_profile: receipt.search_profile.clone(),
1561        candidate_backend: receipt.candidate_backend.clone(),
1562        codec_family: receipt.codec_family.clone(),
1563        codec_profile_digest: receipt.codec_profile_digest.clone(),
1564        artifact_profile_digest: receipt.artifact_profile_digest.clone(),
1565        artifact_count: receipt
1566            .artifact_count
1567            .map(|value| receipt_count_to_u64(value, "artifact_count"))
1568            .transpose()?,
1569        artifact_corruption_count: receipt
1570            .artifact_corruption_count
1571            .map(|value| receipt_count_to_u64(value, "artifact_corruption_count"))
1572            .transpose()?,
1573        artifact_missing_count: receipt
1574            .artifact_missing_count
1575            .map(|value| receipt_count_to_u64(value, "artifact_missing_count"))
1576            .transpose()?,
1577        vector_artifact_manifest_digest: receipt.vector_artifact_manifest_digest.clone(),
1578        artifact_generation_id: receipt.artifact_generation_id.clone(),
1579        approximate_scanned_count: receipt
1580            .approximate_scanned_count
1581            .map(|value| receipt_count_to_u64(value, "approximate_scanned_count"))
1582            .transpose()?,
1583        approximate_returned_count: receipt
1584            .approximate_returned_count
1585            .map(|value| receipt_count_to_u64(value, "approximate_returned_count"))
1586            .transpose()?,
1587        raw_rows_loaded_count: receipt
1588            .raw_rows_loaded_count
1589            .map(|value| receipt_count_to_u64(value, "raw_rows_loaded_count"))
1590            .transpose()?,
1591        filter_strategy: receipt.filter_strategy.clone(),
1592        vector_artifact_count: receipt
1593            .vector_artifact_count
1594            .map(|value| receipt_count_to_u64(value, "vector_artifact_count"))
1595            .transpose()?,
1596        vector_artifact_missing_count: receipt
1597            .vector_artifact_missing_count
1598            .map(|value| receipt_count_to_u64(value, "vector_artifact_missing_count"))
1599            .transpose()?,
1600        vector_artifact_stale_count: receipt
1601            .vector_artifact_stale_count
1602            .map(|value| receipt_count_to_u64(value, "vector_artifact_stale_count"))
1603            .transpose()?,
1604        exact_rerank_count: receipt
1605            .exact_rerank_count
1606            .map(|value| receipt_count_to_u64(value, "exact_rerank_count"))
1607            .transpose()?,
1608        approximate_candidate_count: receipt
1609            .approximate_candidate_count
1610            .map(|value| receipt_count_to_u64(value, "approximate_candidate_count"))
1611            .transpose()?,
1612        fallback_reason: receipt.fallback_reason.clone(),
1613        approximate: receipt.approximate,
1614        requested_candidates: receipt_count_to_u64(
1615            receipt.requested_candidates,
1616            "requested_candidates",
1617        )?,
1618        returned_candidates: receipt_count_to_u64(
1619            receipt.returned_candidates,
1620            "returned_candidates",
1621        )?,
1622        post_filter_candidates: receipt_count_to_u64(
1623            receipt.post_filter_candidates,
1624            "post_filter_candidates",
1625        )?,
1626        fallback: receipt.fallback.clone(),
1627        exact_rerank: receipt.exact_rerank,
1628        result_ids: receipt.result_ids.clone(),
1629        degradations: receipt.degradations.clone(),
1630    })
1631}
1632
1633fn search_receipt_from_stored(
1634    stored: StoredVectorSearchReceiptV1,
1635) -> Result<VectorSearchReceiptV1, MemoryError> {
1636    if stored.schema_version != SEARCH_RECEIPT_SCHEMA_VERSION {
1637        return Err(MemoryError::CorruptData {
1638            table: "search_receipts",
1639            row_id: stored.receipt_id,
1640            detail: format!(
1641                "unsupported receipt schema version '{}'",
1642                stored.schema_version
1643            ),
1644        });
1645    }
1646
1647    Ok(VectorSearchReceiptV1 {
1648        schema_version: stored.schema_version.clone(),
1649        receipt_digest: stored.receipt_digest,
1650        receipt_id: stored.receipt_id.clone(),
1651        evaluation_time: stored.evaluation_time,
1652        trace_id: stored.trace_id,
1653        attempt_family_id: stored.attempt_family_id,
1654        attempt_id: stored.attempt_id,
1655        replay_of: stored.replay_of,
1656        query_embedding_digest: stored.query_embedding_digest,
1657        query_text_digest: stored.query_text_digest,
1658        query_input_digest: stored.query_input_digest,
1659        filter_digest: stored.filter_digest,
1660        redaction_state: stored.redaction_state,
1661        budget_id: stored.budget_id,
1662        deadline_at: stored.deadline_at,
1663        search_profile: stored.search_profile,
1664        candidate_backend: stored.candidate_backend,
1665        codec_family: stored.codec_family,
1666        codec_profile_digest: stored.codec_profile_digest,
1667        artifact_profile_digest: stored.artifact_profile_digest,
1668        artifact_count: stored
1669            .artifact_count
1670            .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "artifact_count"))
1671            .transpose()?,
1672        artifact_corruption_count: stored
1673            .artifact_corruption_count
1674            .map(|value| {
1675                receipt_count_to_usize(value, &stored.receipt_id, "artifact_corruption_count")
1676            })
1677            .transpose()?,
1678        artifact_missing_count: stored
1679            .artifact_missing_count
1680            .map(|value| {
1681                receipt_count_to_usize(value, &stored.receipt_id, "artifact_missing_count")
1682            })
1683            .transpose()?,
1684        vector_artifact_manifest_digest: stored.vector_artifact_manifest_digest,
1685        artifact_generation_id: stored.artifact_generation_id,
1686        approximate_scanned_count: stored
1687            .approximate_scanned_count
1688            .map(|value| {
1689                receipt_count_to_usize(value, &stored.receipt_id, "approximate_scanned_count")
1690            })
1691            .transpose()?,
1692        approximate_returned_count: stored
1693            .approximate_returned_count
1694            .map(|value| {
1695                receipt_count_to_usize(value, &stored.receipt_id, "approximate_returned_count")
1696            })
1697            .transpose()?,
1698        raw_rows_loaded_count: stored
1699            .raw_rows_loaded_count
1700            .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "raw_rows_loaded_count"))
1701            .transpose()?,
1702        filter_strategy: stored.filter_strategy,
1703        vector_artifact_count: stored
1704            .vector_artifact_count
1705            .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_count"))
1706            .transpose()?,
1707        vector_artifact_missing_count: stored
1708            .vector_artifact_missing_count
1709            .map(|value| {
1710                receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_missing_count")
1711            })
1712            .transpose()?,
1713        vector_artifact_stale_count: stored
1714            .vector_artifact_stale_count
1715            .map(|value| {
1716                receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_stale_count")
1717            })
1718            .transpose()?,
1719        exact_rerank_count: stored
1720            .exact_rerank_count
1721            .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "exact_rerank_count"))
1722            .transpose()?,
1723        approximate_candidate_count: stored
1724            .approximate_candidate_count
1725            .map(|value| {
1726                receipt_count_to_usize(value, &stored.receipt_id, "approximate_candidate_count")
1727            })
1728            .transpose()?,
1729        fallback_reason: stored.fallback_reason,
1730        approximate: stored.approximate,
1731        requested_candidates: receipt_count_to_usize(
1732            stored.requested_candidates,
1733            &stored.receipt_id,
1734            "requested_candidates",
1735        )?,
1736        returned_candidates: receipt_count_to_usize(
1737            stored.returned_candidates,
1738            &stored.receipt_id,
1739            "returned_candidates",
1740        )?,
1741        post_filter_candidates: receipt_count_to_usize(
1742            stored.post_filter_candidates,
1743            &stored.receipt_id,
1744            "post_filter_candidates",
1745        )?,
1746        fallback: stored.fallback,
1747        exact_rerank: stored.exact_rerank,
1748        result_ids: stored.result_ids,
1749        degradations: stored.degradations,
1750    })
1751}
1752
1753/// Persist a search receipt as replay metadata.
1754///
1755/// SQLite rows remain authoritative for memory. This table stores only the
1756/// execution receipt and digest so the search can be addressed later.
1757pub fn store_search_receipt(
1758    conn: &Connection,
1759    receipt: &VectorSearchReceiptV1,
1760) -> Result<(), MemoryError> {
1761    let stored = stored_search_receipt(receipt)?;
1762    let receipt_json = serde_json::to_string(&stored)
1763        .map_err(|err| MemoryError::Other(format!("failed to serialize search receipt: {err}")))?;
1764    let receipt_digest = b3_digest(receipt_json.as_bytes());
1765
1766    let existing_digest: Option<String> = conn
1767        .query_row(
1768            "SELECT receipt_digest FROM search_receipts WHERE receipt_id = ?1",
1769            params![&stored.receipt_id],
1770            |row| row.get(0),
1771        )
1772        .optional()?;
1773    if let Some(existing_digest) = existing_digest {
1774        if existing_digest == receipt_digest {
1775            return Ok(());
1776        }
1777        return Err(MemoryError::SearchReceiptConflict {
1778            receipt_id: stored.receipt_id,
1779        });
1780    }
1781
1782    let result_ids_json = serde_json::to_string(&stored.result_ids).map_err(|err| {
1783        MemoryError::Other(format!(
1784            "failed to serialize search receipt result IDs: {err}"
1785        ))
1786    })?;
1787    conn.execute(
1788        "INSERT INTO search_receipts (
1789            receipt_id,
1790            schema_version,
1791            evaluation_time,
1792            search_profile,
1793            candidate_backend,
1794            approximate,
1795            exact_rerank,
1796            fallback,
1797            requested_candidates,
1798            returned_candidates,
1799            post_filter_candidates,
1800            result_ids_json,
1801            receipt_json,
1802            receipt_digest
1803        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
1804        params![
1805            &stored.receipt_id,
1806            SEARCH_RECEIPT_SCHEMA_VERSION,
1807            stored.evaluation_time.to_rfc3339(),
1808            &stored.search_profile,
1809            &stored.candidate_backend,
1810            if stored.approximate { 1_i64 } else { 0_i64 },
1811            if stored.exact_rerank { 1_i64 } else { 0_i64 },
1812            &stored.fallback,
1813            receipt_count_to_i64(stored.requested_candidates, "requested_candidates")?,
1814            receipt_count_to_i64(stored.returned_candidates, "returned_candidates")?,
1815            receipt_count_to_i64(stored.post_filter_candidates, "post_filter_candidates")?,
1816            &result_ids_json,
1817            &receipt_json,
1818            &receipt_digest,
1819        ],
1820    )?;
1821    Ok(())
1822}
1823
1824/// Load a durable search receipt by receipt/request ID.
1825pub fn get_search_receipt(
1826    conn: &Connection,
1827    receipt_id: &str,
1828) -> Result<Option<VectorSearchReceiptV1>, MemoryError> {
1829    let row: Option<(String, String, String)> = conn
1830        .query_row(
1831            "SELECT schema_version, receipt_json, receipt_digest
1832             FROM search_receipts
1833             WHERE receipt_id = ?1",
1834            params![receipt_id],
1835            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1836        )
1837        .optional()?;
1838
1839    let Some((schema_version, receipt_json, receipt_digest)) = row else {
1840        return Ok(None);
1841    };
1842    if schema_version != SEARCH_RECEIPT_SCHEMA_VERSION {
1843        return Err(MemoryError::CorruptData {
1844            table: "search_receipts",
1845            row_id: receipt_id.to_string(),
1846            detail: format!("unsupported receipt schema version '{schema_version}'"),
1847        });
1848    }
1849
1850    let stored: StoredVectorSearchReceiptV1 =
1851        serde_json::from_str(&receipt_json).map_err(|err| MemoryError::CorruptData {
1852            table: "search_receipts",
1853            row_id: receipt_id.to_string(),
1854            detail: format!("invalid receipt JSON: {err}"),
1855        })?;
1856    let mut receipt = search_receipt_from_stored(stored)?;
1857    receipt.receipt_digest = Some(receipt_digest);
1858    Ok(Some(receipt))
1859}
1860
1861fn add_column_if_missing(
1862    conn: &Connection,
1863    table: &str,
1864    column: &str,
1865    column_sql: &str,
1866) -> Result<(), rusqlite::Error> {
1867    let pragma = format!("PRAGMA table_info({table})");
1868    let mut stmt = conn.prepare(&pragma)?;
1869    let exists = stmt
1870        .query_map([], |row| row.get::<_, String>(1))?
1871        .collect::<Result<Vec<_>, _>>()?
1872        .into_iter()
1873        .any(|name| name == column);
1874
1875    if !exists {
1876        conn.execute(
1877            &format!("ALTER TABLE {table} ADD COLUMN {column} {column_sql}"),
1878            [],
1879        )?;
1880    }
1881
1882    Ok(())
1883}
1884
1885/// Check and update the embedding metadata singleton row.
1886pub fn check_embedding_metadata(
1887    conn: &Connection,
1888    config: &EmbeddingConfig,
1889) -> Result<(), MemoryError> {
1890    // INTENTIONAL: row absent on first run before metadata is inserted
1891    let existing: Option<(String, usize)> = conn
1892        .query_row(
1893            "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1894            [],
1895            |row| Ok((row.get(0)?, row.get(1)?)),
1896        )
1897        .ok();
1898
1899    match existing {
1900        Some((model, dims)) => {
1901            if model != config.model || dims != config.dimensions {
1902                tracing::warn!(
1903                    stored_model = %model,
1904                    stored_dims = dims,
1905                    configured_model = %config.model,
1906                    configured_dims = config.dimensions,
1907                    "Embedding model changed. Existing embeddings are stale."
1908                );
1909                conn.execute(
1910                    "UPDATE embedding_metadata
1911                     SET model_name = ?1,
1912                         dimensions = ?2,
1913                         embeddings_dirty = 1,
1914                         updated_at = datetime('now')
1915                     WHERE id = 1",
1916                    params![config.model, config.dimensions],
1917                )?;
1918            }
1919        }
1920        None => {
1921            conn.execute(
1922                "INSERT INTO embedding_metadata (id, model_name, dimensions) VALUES (1, ?1, ?2)",
1923                params![config.model, config.dimensions],
1924            )?;
1925        }
1926    }
1927
1928    Ok(())
1929}
1930
1931/// Encode an f32 slice as bytes for SQLite BLOB storage.
1932pub fn embedding_to_bytes(embedding: &[f32]) -> Vec<u8> {
1933    encode_f32_le(embedding)
1934}
1935
1936/// Encode f32 values as a stable little-endian persisted representation.
1937pub fn encode_f32_le(values: &[f32]) -> Vec<u8> {
1938    let mut bytes = Vec::with_capacity(values.len() * 4);
1939    for value in values {
1940        bytes.extend_from_slice(&value.to_le_bytes());
1941    }
1942    bytes
1943}
1944
1945/// Validate an embedding vector before it is stored or indexed.
1946pub(crate) fn validate_embedding(values: &[f32], expected_dim: usize) -> Result<(), MemoryError> {
1947    if values.len() != expected_dim {
1948        return Err(MemoryError::EmbeddingDimensionMismatch {
1949            expected: expected_dim,
1950            actual: values.len(),
1951        });
1952    }
1953    if let Some((index, _)) = values
1954        .iter()
1955        .enumerate()
1956        .find(|(_, value)| !value.is_finite())
1957    {
1958        return Err(MemoryError::NonFiniteEmbeddingValue { index });
1959    }
1960    Ok(())
1961}
1962
1963/// Validate a returned embedding batch against the requested input count.
1964pub(crate) fn validate_embedding_batch(
1965    values: &[Vec<f32>],
1966    requested: usize,
1967    expected_dim: usize,
1968) -> Result<(), MemoryError> {
1969    if values.len() != requested {
1970        return Err(MemoryError::EmbeddingBatchCountMismatch {
1971            requested,
1972            returned: values.len(),
1973        });
1974    }
1975    for embedding in values {
1976        validate_embedding(embedding, expected_dim)?;
1977    }
1978    Ok(())
1979}
1980
1981/// Validate the exact byte length of a persisted f32 vector blob.
1982pub(crate) fn validate_vector_blob_len(
1983    bytes: &[u8],
1984    expected_dim: usize,
1985) -> Result<(), MemoryError> {
1986    let expected_bytes = expected_dim
1987        .checked_mul(4)
1988        .ok_or_else(|| MemoryError::InvalidConfig {
1989            field: "embedding.dimensions",
1990            reason: "dimension byte length overflow".to_string(),
1991        })?;
1992    if bytes.len() != expected_bytes {
1993        return Err(MemoryError::VectorBlobLengthMismatch {
1994            expected_bytes,
1995            actual_bytes: bytes.len(),
1996        });
1997    }
1998    Ok(())
1999}
2000
2001/// Decode a stable little-endian f32 persisted representation.
2002#[allow(clippy::manual_is_multiple_of)]
2003pub fn decode_f32_le(bytes: &[u8], expected_dim: usize) -> Result<Vec<f32>, MemoryError> {
2004    validate_vector_blob_len(bytes, expected_dim)?;
2005    decode_f32_le_unchecked_dim(bytes)
2006}
2007
2008/// Decode a SQLite embedding BLOB back to f32 values.
2009#[allow(clippy::manual_is_multiple_of)]
2010pub fn bytes_to_embedding(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
2011    if bytes.len() % 4 != 0 {
2012        return Err(MemoryError::InvalidEmbedding {
2013            expected_bytes: bytes.len() - (bytes.len() % 4),
2014            actual_bytes: bytes.len(),
2015        });
2016    }
2017
2018    decode_f32_le_unchecked_dim(bytes)
2019}
2020
2021fn decode_f32_le_unchecked_dim(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
2022    let mut embedding = Vec::with_capacity(bytes.len() / 4);
2023    for (index, chunk) in bytes.chunks_exact(4).enumerate() {
2024        let value = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
2025        if !value.is_finite() {
2026            return Err(MemoryError::NonFiniteEmbeddingValue { index });
2027        }
2028        embedding.push(value);
2029    }
2030    Ok(embedding)
2031}
2032
2033pub fn is_embeddings_dirty(conn: &Connection) -> Result<bool, MemoryError> {
2034    let dirty: i32 = conn
2035        .query_row(
2036            "SELECT COALESCE(embeddings_dirty, 0) FROM embedding_metadata WHERE id = 1",
2037            [],
2038            |row| row.get(0),
2039        )
2040        .unwrap_or(0);
2041    Ok(dirty != 0)
2042}
2043
2044pub fn clear_embeddings_dirty(conn: &Connection) -> Result<(), MemoryError> {
2045    conn.execute(
2046        "UPDATE embedding_metadata SET embeddings_dirty = 0 WHERE id = 1",
2047        [],
2048    )?;
2049    Ok(())
2050}
2051
2052#[cfg(feature = "hnsw")]
2053pub(crate) fn queue_pending_index_op(
2054    tx: &rusqlite::Transaction<'_>,
2055    item_key: &str,
2056    entity_type: &str,
2057    op_kind: IndexOpKind,
2058) -> Result<(), MemoryError> {
2059    tx.execute(
2060        "INSERT INTO pending_index_ops (item_key, entity_type, op_kind, attempt_count, last_error, updated_at)
2061         VALUES (?1, ?2, ?3, 0, NULL, datetime('now'))
2062         ON CONFLICT(item_key) DO UPDATE SET
2063             entity_type = excluded.entity_type,
2064             op_kind = excluded.op_kind,
2065             attempt_count = 0,
2066             last_error = NULL,
2067             updated_at = datetime('now')",
2068        params![item_key, entity_type, op_kind.as_str()],
2069    )?;
2070    mark_sidecar_dirty(tx)?;
2071    Ok(())
2072}
2073
2074#[cfg(feature = "hnsw")]
2075pub(crate) use IndexOpKind as PendingIndexOpKind;
2076
2077#[cfg(feature = "hnsw")]
2078pub(crate) fn enqueue_pending_index_op(
2079    tx: &rusqlite::Transaction<'_>,
2080    item_key: &str,
2081    entity_type: &str,
2082    op_kind: PendingIndexOpKind,
2083) -> Result<(), MemoryError> {
2084    queue_pending_index_op(tx, item_key, entity_type, op_kind)
2085}
2086
2087pub(crate) fn list_pending_index_ops(
2088    conn: &Connection,
2089) -> Result<Vec<PendingIndexOp>, MemoryError> {
2090    let table_exists: bool = conn
2091        .query_row(
2092            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
2093            [],
2094            |row| row.get(0),
2095        )
2096        .unwrap_or(false);
2097    if !table_exists {
2098        return Ok(Vec::new());
2099    }
2100
2101    let mut stmt = conn.prepare(
2102        "SELECT item_key, entity_type, op_kind, attempt_count, last_error
2103         FROM pending_index_ops
2104         ORDER BY updated_at ASC, item_key ASC",
2105    )?;
2106    let rows = stmt
2107        .query_map([], |row| {
2108            let item_key: String = row.get(0)?;
2109            let op_kind: String = row.get(2)?;
2110            Ok(PendingIndexOp {
2111                item_key: item_key.clone(),
2112                entity_type: row.get(1)?,
2113                op_kind: IndexOpKind::parse(&op_kind, &item_key).map_err(|e| {
2114                    rusqlite::Error::FromSqlConversionFailure(
2115                        2,
2116                        rusqlite::types::Type::Text,
2117                        Box::new(e),
2118                    )
2119                })?,
2120                attempt_count: row.get::<_, i64>(3)? as u32,
2121                last_error: row.get(4)?,
2122            })
2123        })?
2124        .collect::<Result<Vec<_>, _>>()?;
2125    Ok(rows)
2126}
2127
2128#[cfg(feature = "hnsw")]
2129pub(crate) fn pending_index_op_count(conn: &Connection) -> Result<usize, MemoryError> {
2130    let table_exists: bool = conn
2131        .query_row(
2132            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
2133            [],
2134            |row| row.get(0),
2135        )
2136        .unwrap_or(false);
2137    if !table_exists {
2138        return Ok(0);
2139    }
2140
2141    let count: i64 = conn.query_row("SELECT COUNT(*) FROM pending_index_ops", [], |row| {
2142        row.get(0)
2143    })?;
2144    Ok(count as usize)
2145}
2146
2147#[cfg(feature = "hnsw")]
2148pub(crate) fn mark_pending_index_ops_failed(
2149    conn: &Connection,
2150    item_keys: &[String],
2151    error: &str,
2152) -> Result<(), MemoryError> {
2153    with_transaction(conn, |tx| {
2154        for item_key in item_keys {
2155            tx.execute(
2156                "UPDATE pending_index_ops
2157                 SET attempt_count = attempt_count + 1,
2158                     last_error = ?1,
2159                     updated_at = datetime('now')
2160                 WHERE item_key = ?2",
2161                params![error, item_key],
2162            )?;
2163        }
2164        Ok(())
2165    })
2166}
2167
2168#[cfg(feature = "hnsw")]
2169pub(crate) fn clear_pending_index_ops(
2170    conn: &Connection,
2171    item_keys: &[String],
2172) -> Result<(), MemoryError> {
2173    with_transaction(conn, |tx| {
2174        for item_key in item_keys {
2175            tx.execute(
2176                "DELETE FROM pending_index_ops WHERE item_key = ?1",
2177                params![item_key],
2178            )?;
2179        }
2180        Ok(())
2181    })
2182}
2183
2184#[cfg(feature = "hnsw")]
2185pub(crate) fn clear_all_pending_index_ops(conn: &Connection) -> Result<(), MemoryError> {
2186    conn.execute("DELETE FROM pending_index_ops", [])?;
2187    Ok(())
2188}
2189
2190#[cfg(feature = "hnsw")]
2191pub(crate) fn load_embedding_for_index_key(
2192    conn: &Connection,
2193    item_key: &str,
2194) -> Result<Option<Vec<f32>>, MemoryError> {
2195    let Some((domain, raw_id)) = item_key.split_once(':') else {
2196        return Err(MemoryError::InvalidKey(item_key.to_string()));
2197    };
2198
2199    let blob_result: Result<Option<Vec<u8>>, rusqlite::Error> = match domain {
2200        "fact" => conn.query_row(
2201            "SELECT embedding FROM facts WHERE id = ?1",
2202            params![raw_id],
2203            |row| row.get(0),
2204        ),
2205        "chunk" => conn.query_row(
2206            "SELECT embedding FROM chunks WHERE id = ?1",
2207            params![raw_id],
2208            |row| row.get(0),
2209        ),
2210        "msg" => {
2211            let message_id = raw_id
2212                .parse::<i64>()
2213                .map_err(|e| MemoryError::InvalidKey(format!("{}: {e}", item_key)))?;
2214            conn.query_row(
2215                "SELECT embedding FROM messages WHERE id = ?1",
2216                params![message_id],
2217                |row| row.get(0),
2218            )
2219        }
2220        "episode" => conn.query_row(
2221            "SELECT embedding FROM episodes WHERE episode_id = ?1",
2222            params![raw_id],
2223            |row| row.get(0),
2224        ),
2225        _ => return Err(MemoryError::InvalidKey(item_key.to_string())),
2226    };
2227
2228    let blob = match blob_result {
2229        Ok(blob) => blob,
2230        Err(rusqlite::Error::QueryReturnedNoRows) => None,
2231        Err(err) => return Err(err.into()),
2232    };
2233
2234    blob.map(|bytes| bytes_to_embedding(&bytes)).transpose()
2235}
2236
2237#[cfg(feature = "hnsw")]
2238fn mark_sidecar_dirty(tx: &rusqlite::Transaction<'_>) -> Result<(), MemoryError> {
2239    tx.execute(
2240        "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '1')
2241         ON CONFLICT(key) DO UPDATE SET value = '1'",
2242        [],
2243    )?;
2244    Ok(())
2245}
2246
2247#[cfg(feature = "hnsw")]
2248pub(crate) fn is_sidecar_dirty(conn: &Connection) -> Result<bool, MemoryError> {
2249    // INTENTIONAL: row absent when HNSW metadata has not been written yet
2250    let dirty: Option<String> = conn
2251        .query_row(
2252            "SELECT value FROM hnsw_metadata WHERE key = 'sidecar_dirty'",
2253            [],
2254            |row| row.get(0),
2255        )
2256        .ok();
2257    Ok(matches!(dirty.as_deref(), Some("1")))
2258}
2259
2260#[cfg(feature = "hnsw")]
2261pub(crate) fn set_sidecar_dirty(conn: &Connection, dirty: bool) -> Result<(), MemoryError> {
2262    conn.execute(
2263        "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', ?1)
2264         ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2265        params![if dirty { "1" } else { "0" }],
2266    )?;
2267    Ok(())
2268}
2269
2270pub(crate) fn parse_optional_json(
2271    table: &'static str,
2272    row_id: &str,
2273    field: &'static str,
2274    raw: Option<&str>,
2275) -> Result<Option<serde_json::Value>, MemoryError> {
2276    match raw {
2277        Some(raw) => serde_json::from_str(raw)
2278            .map(Some)
2279            .map_err(|e| MemoryError::CorruptData {
2280                table,
2281                row_id: row_id.to_string(),
2282                detail: format!("invalid {field}: {e}"),
2283            }),
2284        None => Ok(None),
2285    }
2286}
2287
2288pub(crate) fn parse_string_list_json(
2289    table: &'static str,
2290    row_id: &str,
2291    field: &'static str,
2292    raw: &str,
2293) -> Result<Vec<String>, MemoryError> {
2294    serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
2295        table,
2296        row_id: row_id.to_string(),
2297        detail: format!("invalid {field}: {e}"),
2298    })
2299}
2300
2301pub(crate) fn parse_role(
2302    table: &'static str,
2303    row_id: &str,
2304    raw: &str,
2305) -> Result<Role, MemoryError> {
2306    Role::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
2307        table,
2308        row_id: row_id.to_string(),
2309        detail: format!("invalid role '{raw}'"),
2310    })
2311}
2312
2313pub(crate) fn parse_episode_outcome(
2314    row_id: &str,
2315    raw: &str,
2316) -> Result<EpisodeOutcome, MemoryError> {
2317    EpisodeOutcome::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
2318        table: "episodes",
2319        row_id: row_id.to_string(),
2320        detail: format!("invalid outcome '{raw}'"),
2321    })
2322}
2323
2324pub(crate) fn parse_verification_status(
2325    row_id: &str,
2326    raw: &str,
2327) -> Result<VerificationStatus, MemoryError> {
2328    serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
2329        table: "episodes",
2330        row_id: row_id.to_string(),
2331        detail: format!("invalid verification_status: {e}"),
2332    })
2333}
2334
2335/// Run integrity verification on the database.
2336pub fn verify_integrity_sync(
2337    conn: &Connection,
2338    mode: VerifyMode,
2339) -> Result<IntegrityReport, MemoryError> {
2340    let mut issues = Vec::new();
2341
2342    let schema_version: u32 = conn
2343        .query_row("PRAGMA user_version", [], |row| row.get(0))
2344        .unwrap_or_else(|e| {
2345            issues.push(format!("failed to read schema version: {e}"));
2346            0
2347        });
2348    if schema_version > MAX_SCHEMA_VERSION {
2349        issues.push(format!(
2350            "schema version {} is ahead of supported {}",
2351            schema_version, MAX_SCHEMA_VERSION
2352        ));
2353    }
2354
2355    let fact_count: usize = conn
2356        .query_row("SELECT COUNT(*) FROM facts", [], |row| row.get(0))
2357        .unwrap_or_else(|e| {
2358            issues.push(format!("failed to count facts: {e}"));
2359            0
2360        });
2361    let chunk_count: usize = conn
2362        .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
2363        .unwrap_or_else(|e| {
2364            issues.push(format!("failed to count chunks: {e}"));
2365            0
2366        });
2367    let message_count: usize = conn
2368        .query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
2369        .unwrap_or_else(|e| {
2370            issues.push(format!("failed to count messages: {e}"));
2371            0
2372        });
2373    let episode_count: usize = conn
2374        .query_row("SELECT COUNT(*) FROM episodes", [], |row| row.get(0))
2375        .unwrap_or_else(|e| {
2376            issues.push(format!("failed to count episodes: {e}"));
2377            0
2378        });
2379
2380    let facts_missing_embeddings: usize = conn
2381        .query_row(
2382            "SELECT COUNT(*) FROM facts WHERE embedding IS NULL",
2383            [],
2384            |row| row.get(0),
2385        )
2386        .unwrap_or_else(|e| {
2387            issues.push(format!("failed to count facts missing embeddings: {e}"));
2388            0
2389        });
2390    let chunks_missing_embeddings: usize = conn
2391        .query_row(
2392            "SELECT COUNT(*) FROM chunks WHERE embedding IS NULL",
2393            [],
2394            |row| row.get(0),
2395        )
2396        .unwrap_or_else(|e| {
2397            issues.push(format!("failed to count chunks missing embeddings: {e}"));
2398            0
2399        });
2400    let episodes_missing_embeddings: usize = conn
2401        .query_row(
2402            "SELECT COUNT(*) FROM episodes WHERE embedding IS NULL",
2403            [],
2404            |row| row.get(0),
2405        )
2406        .unwrap_or_else(|e| {
2407            issues.push(format!("failed to count episodes missing embeddings: {e}"));
2408            0
2409        });
2410
2411    if facts_missing_embeddings > 0 {
2412        issues.push(format!(
2413            "{} facts missing embeddings",
2414            facts_missing_embeddings
2415        ));
2416    }
2417    if chunks_missing_embeddings > 0 {
2418        issues.push(format!(
2419            "{} chunks missing embeddings",
2420            chunks_missing_embeddings
2421        ));
2422    }
2423    if episodes_missing_embeddings > 0 {
2424        issues.push(format!(
2425            "{} episodes missing embeddings",
2426            episodes_missing_embeddings
2427        ));
2428    }
2429
2430    let pending_ops = list_pending_index_ops(conn).unwrap_or_default();
2431    if !pending_ops.is_empty() {
2432        issues.push(format!(
2433            "{} pending HNSW sidecar ops queued in SQLite",
2434            pending_ops.len()
2435        ));
2436        for op in pending_ops.iter().take(5) {
2437            let op_kind = op.op_kind.as_str();
2438            let detail = match &op.last_error {
2439                Some(last_error) => format!(
2440                    "{} {} {} (attempts: {}, last_error: {})",
2441                    op.entity_type,
2442                    op.op_kind.as_str(),
2443                    op.item_key,
2444                    op.attempt_count,
2445                    last_error
2446                ),
2447                None => format!(
2448                    "{} {} {} (attempts: {})",
2449                    op.entity_type, op_kind, op.item_key, op.attempt_count
2450                ),
2451            };
2452            issues.push(format!("pending sidecar op: {detail}"));
2453        }
2454    }
2455
2456    if mode == VerifyMode::Full {
2457        let dims: usize = conn
2458            .query_row(
2459                "SELECT dimensions FROM embedding_metadata WHERE id = 1",
2460                [],
2461                |row| row.get(0),
2462            )
2463            .unwrap_or_else(|e| {
2464                issues.push(format!("failed to read embedding dimensions: {e}"));
2465                0
2466            });
2467
2468        verify_fts_drift(conn, "facts", "facts_rowid_map", fact_count, &mut issues);
2469        verify_fts_drift(conn, "chunks", "chunks_rowid_map", chunk_count, &mut issues);
2470        verify_fts_drift(
2471            conn,
2472            "messages",
2473            "messages_rowid_map",
2474            message_count,
2475            &mut issues,
2476        );
2477        verify_fts_drift(
2478            conn,
2479            "episodes",
2480            "episodes_rowid_map",
2481            episode_count,
2482            &mut issues,
2483        );
2484
2485        verify_blob_table(conn, "facts", "id", "embedding", dims, &mut issues)?;
2486        verify_blob_table(conn, "chunks", "id", "embedding", dims, &mut issues)?;
2487        verify_blob_table(conn, "messages", "id", "embedding", dims, &mut issues)?;
2488        verify_blob_table(
2489            conn,
2490            "episodes",
2491            "episode_id",
2492            "embedding",
2493            dims,
2494            &mut issues,
2495        )?;
2496
2497        verify_quantized_table(conn, "facts", "id", dims, &mut issues)?;
2498        verify_quantized_table(conn, "chunks", "id", dims, &mut issues)?;
2499        verify_quantized_table(conn, "messages", "id", dims, &mut issues)?;
2500        verify_quantized_table(conn, "episodes", "episode_id", dims, &mut issues)?;
2501
2502        verify_session_rows(conn, &mut issues)?;
2503        verify_message_rows(conn, &mut issues)?;
2504        verify_fact_rows(conn, &mut issues)?;
2505        verify_document_rows(conn, &mut issues)?;
2506        verify_episode_rows(conn, &mut issues)?;
2507
2508        let integrity_check: String = conn
2509            .query_row("PRAGMA integrity_check", [], |row| row.get(0))
2510            .unwrap_or_else(|_| "error".to_string());
2511        if integrity_check != "ok" {
2512            issues.push(format!("SQLite integrity_check: {}", integrity_check));
2513        }
2514    }
2515
2516    Ok(IntegrityReport {
2517        ok: issues.is_empty(),
2518        schema_version,
2519        fact_count,
2520        chunk_count,
2521        message_count,
2522        facts_missing_embeddings,
2523        chunks_missing_embeddings,
2524        issues,
2525    })
2526}
2527
2528/// Reconcile FTS indexes by rebuilding them from source data.
2529pub fn reconcile_fts(conn: &Connection) -> Result<(), MemoryError> {
2530    with_transaction(conn, |tx| {
2531        tx.execute_batch("DROP TABLE IF EXISTS facts_fts")?;
2532        tx.execute_batch("DELETE FROM facts_rowid_map")?;
2533        tx.execute_batch(
2534            "CREATE VIRTUAL TABLE facts_fts USING fts5(
2535                content,
2536                content='',
2537                content_rowid='rowid',
2538                tokenize='porter unicode61'
2539            )",
2540        )?;
2541        tx.execute_batch("INSERT INTO facts_rowid_map (fact_id) SELECT id FROM facts")?;
2542        tx.execute_batch(
2543            "INSERT INTO facts_fts (rowid, content)
2544             SELECT rm.rowid, f.content
2545             FROM facts_rowid_map rm
2546             JOIN facts f ON f.id = rm.fact_id",
2547        )?;
2548
2549        tx.execute_batch("DROP TABLE IF EXISTS chunks_fts")?;
2550        tx.execute_batch("DELETE FROM chunks_rowid_map")?;
2551        tx.execute_batch(
2552            "CREATE VIRTUAL TABLE chunks_fts USING fts5(
2553                content,
2554                content='',
2555                content_rowid='rowid',
2556                tokenize='porter unicode61'
2557            )",
2558        )?;
2559        tx.execute_batch("INSERT INTO chunks_rowid_map (chunk_id) SELECT id FROM chunks")?;
2560        tx.execute_batch(
2561            "INSERT INTO chunks_fts (rowid, content)
2562             SELECT rm.rowid, c.content
2563             FROM chunks_rowid_map rm
2564             JOIN chunks c ON c.id = rm.chunk_id",
2565        )?;
2566
2567        tx.execute_batch("DROP TABLE IF EXISTS messages_fts")?;
2568        tx.execute_batch("DELETE FROM messages_rowid_map")?;
2569        tx.execute_batch(
2570            "CREATE VIRTUAL TABLE messages_fts USING fts5(
2571                content,
2572                content='',
2573                content_rowid='rowid',
2574                tokenize='porter unicode61'
2575            )",
2576        )?;
2577        tx.execute_batch("INSERT INTO messages_rowid_map (message_id) SELECT id FROM messages")?;
2578        tx.execute_batch(
2579            "INSERT INTO messages_fts (rowid, content)
2580             SELECT rm.rowid, m.content
2581             FROM messages_rowid_map rm
2582             JOIN messages m ON m.id = rm.message_id",
2583        )?;
2584
2585        tx.execute_batch("DROP TABLE IF EXISTS episodes_fts")?;
2586        tx.execute_batch("DELETE FROM episodes_rowid_map")?;
2587        tx.execute_batch(
2588            "CREATE VIRTUAL TABLE episodes_fts USING fts5(
2589                content,
2590                content='',
2591                content_rowid='rowid',
2592                tokenize='porter unicode61'
2593            )",
2594        )?;
2595        tx.execute_batch(
2596            "INSERT INTO episodes_rowid_map (episode_id, document_id) SELECT episode_id, document_id FROM episodes",
2597        )?;
2598        tx.execute_batch(
2599            "INSERT INTO episodes_fts (rowid, content)
2600             SELECT rm.rowid, e.search_text
2601             FROM episodes_rowid_map rm
2602             JOIN episodes e ON e.episode_id = rm.episode_id",
2603        )?;
2604
2605        Ok(())
2606    })?;
2607
2608    tracing::info!("FTS indexes reconciled");
2609    Ok(())
2610}
2611
2612fn verify_fts_drift(
2613    conn: &Connection,
2614    label: &str,
2615    map_table: &str,
2616    source_count: usize,
2617    issues: &mut Vec<String>,
2618) {
2619    let table_exists: bool = conn
2620        .query_row(
2621            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name = ?1",
2622            params![map_table],
2623            |row| row.get(0),
2624        )
2625        .unwrap_or(false);
2626    if !table_exists {
2627        if source_count > 0 {
2628            issues.push(format!("{} rows exist but {} is missing", label, map_table));
2629        }
2630        return;
2631    }
2632
2633    let sql = format!("SELECT COUNT(*) FROM {}", map_table);
2634    let indexed_count: usize = conn.query_row(&sql, [], |row| row.get(0)).unwrap_or(0);
2635    if indexed_count != source_count {
2636        issues.push(format!(
2637            "FTS {} index drift: {} rows in map vs {} source rows",
2638            label, indexed_count, source_count
2639        ));
2640    }
2641}
2642
2643fn verify_blob_table(
2644    conn: &Connection,
2645    table: &'static str,
2646    id_column: &'static str,
2647    blob_column: &'static str,
2648    expected_dims: usize,
2649    issues: &mut Vec<String>,
2650) -> Result<(), MemoryError> {
2651    if expected_dims == 0 {
2652        return Ok(());
2653    }
2654
2655    let sql = format!(
2656        "SELECT CAST({id_column} AS TEXT), {blob_column} FROM {table} WHERE {blob_column} IS NOT NULL"
2657    );
2658    let mut stmt = conn.prepare(&sql)?;
2659    let rows = stmt.query_map([], |row| {
2660        Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
2661    })?;
2662
2663    for row in rows {
2664        let (row_id, blob) = row?;
2665        match bytes_to_embedding(&blob) {
2666            Ok(embedding) if embedding.len() != expected_dims => issues.push(format!(
2667                "{}({}) has embedding dimension {} but expected {}",
2668                table,
2669                row_id,
2670                embedding.len(),
2671                expected_dims
2672            )),
2673            Ok(_) => {}
2674            Err(err) => issues.push(format!(
2675                "{}({}) invalid embedding blob: {}",
2676                table, row_id, err
2677            )),
2678        }
2679    }
2680
2681    Ok(())
2682}
2683
2684fn verify_quantized_table(
2685    conn: &Connection,
2686    table: &'static str,
2687    id_column: &'static str,
2688    expected_dims: usize,
2689    issues: &mut Vec<String>,
2690) -> Result<(), MemoryError> {
2691    if expected_dims == 0 {
2692        return Ok(());
2693    }
2694
2695    let sql = format!(
2696        "SELECT CAST({id_column} AS TEXT), embedding_q8 FROM {table} WHERE embedding IS NOT NULL"
2697    );
2698    let mut stmt = conn.prepare(&sql)?;
2699    let rows = stmt.query_map([], |row| {
2700        Ok((row.get::<_, String>(0)?, row.get::<_, Option<Vec<u8>>>(1)?))
2701    })?;
2702
2703    for row in rows {
2704        let (row_id, blob) = row?;
2705        match blob {
2706            Some(blob) => {
2707                if let Err(err) = unpack_quantized(&blob, expected_dims) {
2708                    issues.push(format!(
2709                        "{}({}) invalid quantized embedding: {}",
2710                        table, row_id, err
2711                    ));
2712                }
2713            }
2714            None => issues.push(format!("{}({}) missing quantized embedding", table, row_id)),
2715        }
2716    }
2717
2718    Ok(())
2719}
2720
2721fn verify_session_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2722    let mut stmt = conn.prepare("SELECT id, metadata FROM sessions WHERE metadata IS NOT NULL")?;
2723    let rows = stmt.query_map([], |row| {
2724        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2725    })?;
2726    for row in rows {
2727        let (id, metadata) = row?;
2728        if let Err(err) = parse_optional_json("sessions", &id, "metadata", Some(&metadata)) {
2729            issues.push(err.to_string());
2730        }
2731    }
2732    Ok(())
2733}
2734
2735fn verify_message_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2736    let mut stmt = conn.prepare("SELECT id, role, metadata FROM messages")?;
2737    let rows = stmt.query_map([], |row| {
2738        Ok((
2739            row.get::<_, i64>(0)?,
2740            row.get::<_, String>(1)?,
2741            row.get::<_, Option<String>>(2)?,
2742        ))
2743    })?;
2744    for row in rows {
2745        let (id, role, metadata) = row?;
2746        let row_id = id.to_string();
2747        if let Err(err) = parse_role("messages", &row_id, &role) {
2748            issues.push(err.to_string());
2749        }
2750        if let Err(err) = parse_optional_json("messages", &row_id, "metadata", metadata.as_deref())
2751        {
2752            issues.push(err.to_string());
2753        }
2754    }
2755    Ok(())
2756}
2757
2758fn verify_fact_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2759    let mut stmt = conn.prepare("SELECT id, metadata FROM facts WHERE metadata IS NOT NULL")?;
2760    let rows = stmt.query_map([], |row| {
2761        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2762    })?;
2763    for row in rows {
2764        let (id, metadata) = row?;
2765        if let Err(err) = parse_optional_json("facts", &id, "metadata", Some(&metadata)) {
2766            issues.push(err.to_string());
2767        }
2768    }
2769    Ok(())
2770}
2771
2772fn verify_document_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2773    let mut stmt = conn.prepare("SELECT id, metadata FROM documents WHERE metadata IS NOT NULL")?;
2774    let rows = stmt.query_map([], |row| {
2775        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2776    })?;
2777    for row in rows {
2778        let (id, metadata) = row?;
2779        if let Err(err) = parse_optional_json("documents", &id, "metadata", Some(&metadata)) {
2780            issues.push(err.to_string());
2781        }
2782    }
2783    Ok(())
2784}
2785
2786fn verify_episode_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2787    let mut stmt = conn.prepare(
2788        "SELECT episode_id, cause_ids, outcome, verification_status
2789         FROM episodes",
2790    )?;
2791    let rows = stmt.query_map([], |row| {
2792        Ok((
2793            row.get::<_, String>(0)?,
2794            row.get::<_, String>(1)?,
2795            row.get::<_, String>(2)?,
2796            row.get::<_, String>(3)?,
2797        ))
2798    })?;
2799    for row in rows {
2800        let (episode_id, cause_ids, outcome, verification_status) = row?;
2801        if let Err(err) = parse_string_list_json("episodes", &episode_id, "cause_ids", &cause_ids) {
2802            issues.push(err.to_string());
2803        }
2804        if let Err(err) = parse_episode_outcome(&episode_id, &outcome) {
2805            issues.push(err.to_string());
2806        }
2807        if let Err(err) = parse_verification_status(&episode_id, &verification_status) {
2808            issues.push(err.to_string());
2809        }
2810    }
2811    Ok(())
2812}