Skip to main content

semantic_memory/
db.rs

1//! Database initialization, migrations, integrity checks, and durable sidecar state.
2
3use crate::config::{EmbeddingConfig, MemoryLimits, PoolConfig};
4use crate::error::MemoryError;
5use crate::quantize::unpack_quantized;
6use crate::types::{EpisodeOutcome, Role, VerificationStatus};
7use rusqlite::{params, Connection, OpenFlags};
8use std::path::Path;
9
10/// V1 migration: full schema.
11const MIGRATION_V1: &str = r#"
12-- CONVERSATIONS
13CREATE TABLE sessions (
14    id          TEXT PRIMARY KEY,
15    channel     TEXT NOT NULL DEFAULT 'repl',
16    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
17    updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
18    metadata    TEXT
19);
20
21CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC);
22
23CREATE TABLE messages (
24    id          INTEGER PRIMARY KEY AUTOINCREMENT,
25    session_id  TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
26    role        TEXT NOT NULL CHECK (role IN ('system', 'user', 'assistant', 'tool')),
27    content     TEXT NOT NULL,
28    token_count INTEGER,
29    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
30    metadata    TEXT
31);
32
33CREATE INDEX idx_messages_session ON messages(session_id, created_at ASC);
34CREATE INDEX idx_messages_created ON messages(created_at DESC);
35
36-- KNOWLEDGE (Facts)
37CREATE TABLE facts (
38    id          TEXT PRIMARY KEY,
39    namespace   TEXT NOT NULL DEFAULT 'general',
40    content     TEXT NOT NULL,
41    source      TEXT,
42    embedding   BLOB,
43    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
44    updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
45    metadata    TEXT
46);
47
48CREATE INDEX idx_facts_namespace ON facts(namespace);
49CREATE INDEX idx_facts_updated ON facts(updated_at DESC);
50
51CREATE TABLE facts_rowid_map (
52    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
53    fact_id     TEXT NOT NULL UNIQUE REFERENCES facts(id) ON DELETE CASCADE
54);
55
56CREATE VIRTUAL TABLE facts_fts USING fts5(
57    content,
58    content='',
59    content_rowid='rowid',
60    tokenize='porter unicode61'
61);
62
63-- DOCUMENTS (Chunked content)
64CREATE TABLE documents (
65    id          TEXT PRIMARY KEY,
66    title       TEXT NOT NULL,
67    source_path TEXT,
68    namespace   TEXT NOT NULL DEFAULT 'general',
69    created_at  TEXT NOT NULL DEFAULT (datetime('now')),
70    metadata    TEXT
71);
72
73CREATE TABLE chunks (
74    id          TEXT PRIMARY KEY,
75    document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
76    chunk_index INTEGER NOT NULL,
77    content     TEXT NOT NULL,
78    token_count INTEGER,
79    embedding   BLOB,
80    created_at  TEXT NOT NULL DEFAULT (datetime('now'))
81);
82
83CREATE INDEX idx_chunks_document ON chunks(document_id, chunk_index ASC);
84
85CREATE TABLE chunks_rowid_map (
86    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
87    chunk_id    TEXT NOT NULL UNIQUE REFERENCES chunks(id) ON DELETE CASCADE
88);
89
90CREATE VIRTUAL TABLE chunks_fts USING fts5(
91    content,
92    content='',
93    content_rowid='rowid',
94    tokenize='porter unicode61'
95);
96
97-- EMBEDDING METADATA
98CREATE TABLE embedding_metadata (
99    id          INTEGER PRIMARY KEY CHECK (id = 1),
100    model_name  TEXT NOT NULL,
101    dimensions  INTEGER NOT NULL,
102    updated_at  TEXT NOT NULL DEFAULT (datetime('now'))
103);
104"#;
105
106/// V2 migration: message embeddings for conversation search.
107const MIGRATION_V2: &str = r#"
108ALTER TABLE messages ADD COLUMN embedding BLOB;
109
110CREATE TABLE messages_rowid_map (
111    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
112    message_id  INTEGER NOT NULL UNIQUE REFERENCES messages(id) ON DELETE CASCADE
113);
114
115CREATE VIRTUAL TABLE messages_fts USING fts5(
116    content,
117    content='',
118    content_rowid='rowid',
119    tokenize='porter unicode61'
120);
121"#;
122
123/// V3 migration: embedding staleness tracking.
124const MIGRATION_V3: &str = r#"
125ALTER TABLE embedding_metadata ADD COLUMN embeddings_dirty INTEGER NOT NULL DEFAULT 0;
126"#;
127
128/// V4 migration: HNSW metadata tracking.
129const MIGRATION_V4: &str = r#"
130CREATE TABLE IF NOT EXISTS hnsw_metadata (
131    key TEXT PRIMARY KEY,
132    value TEXT NOT NULL
133);
134"#;
135
136/// V5 migration: quantized embeddings + HNSW keymap persistence.
137const MIGRATION_V5: &str = r#"
138ALTER TABLE facts ADD COLUMN embedding_q8 BLOB;
139ALTER TABLE chunks ADD COLUMN embedding_q8 BLOB;
140ALTER TABLE messages ADD COLUMN embedding_q8 BLOB;
141
142CREATE TABLE IF NOT EXISTS hnsw_keymap (
143    node_id     INTEGER PRIMARY KEY,
144    item_key    TEXT NOT NULL UNIQUE,
145    deleted     INTEGER NOT NULL DEFAULT 0
146);
147
148CREATE INDEX idx_hnsw_keymap_key ON hnsw_keymap(item_key);
149"#;
150
151/// V6 migration: episodes table for causal tracking.
152const MIGRATION_V6: &str = r#"
153CREATE TABLE IF NOT EXISTS episodes (
154    document_id TEXT PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
155    cause_ids TEXT NOT NULL,
156    effect_type TEXT NOT NULL,
157    outcome TEXT NOT NULL DEFAULT 'pending',
158    confidence REAL NOT NULL DEFAULT 0.0,
159    verification_status TEXT NOT NULL DEFAULT '{"status":"unverified"}',
160    experiment_id TEXT,
161    created_at TEXT NOT NULL DEFAULT (datetime('now'))
162);
163
164CREATE INDEX IF NOT EXISTS idx_episodes_effect_type ON episodes(effect_type);
165CREATE INDEX IF NOT EXISTS idx_episodes_outcome ON episodes(outcome);
166CREATE INDEX IF NOT EXISTS idx_episodes_experiment_id ON episodes(experiment_id);
167"#;
168
169/// V7 migration: searchable episodes + durable sidecar journal.
170const MIGRATION_V7: &str = r#"
171ALTER TABLE episodes ADD COLUMN updated_at TEXT NOT NULL DEFAULT (datetime('now'));
172ALTER TABLE episodes ADD COLUMN search_text TEXT NOT NULL DEFAULT '';
173ALTER TABLE episodes ADD COLUMN embedding BLOB;
174ALTER TABLE episodes ADD COLUMN embedding_q8 BLOB;
175
176CREATE TABLE IF NOT EXISTS episodes_rowid_map (
177    rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
178    document_id TEXT NOT NULL UNIQUE REFERENCES episodes(document_id) ON DELETE CASCADE
179);
180
181CREATE VIRTUAL TABLE episodes_fts USING fts5(
182    content,
183    content='',
184    content_rowid='rowid',
185    tokenize='porter unicode61'
186);
187
188CREATE TABLE IF NOT EXISTS pending_index_ops (
189    item_key      TEXT PRIMARY KEY,
190    entity_type   TEXT NOT NULL,
191    op_kind       TEXT NOT NULL CHECK (op_kind IN ('upsert', 'delete')),
192    attempt_count INTEGER NOT NULL DEFAULT 0,
193    last_error    TEXT,
194    updated_at    TEXT NOT NULL DEFAULT (datetime('now'))
195);
196
197INSERT OR IGNORE INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '0');
198
199UPDATE episodes
200SET search_text = TRIM(
201    COALESCE(effect_type, '') || ' ' ||
202    COALESCE(outcome, '') || ' ' ||
203    COALESCE(experiment_id, '') || ' ' ||
204    COALESCE(cause_ids, '')
205)
206WHERE search_text = '';
207
208INSERT OR IGNORE INTO episodes_rowid_map (document_id)
209SELECT document_id FROM episodes;
210
211INSERT INTO episodes_fts (rowid, content)
212SELECT rm.rowid, e.search_text
213FROM episodes_rowid_map rm
214JOIN episodes e ON e.document_id = rm.document_id;
215"#;
216
217/// V8 migration: durable episode trace IDs.
218const MIGRATION_V8: &str = r#"
219ALTER TABLE episodes ADD COLUMN trace_id TEXT;
220"#;
221
222/// V9 migration: first-class episode identity + normalized causal edge table.
223///
224/// Rebuilds the episodes table so `episode_id` is the primary key while
225/// `document_id` becomes a non-unique FK allowing multiple episodes per doc.
226/// Adds `episode_causes` for normalized causal backlinks.
227///
228/// Applied via `run_migration_v9()` because it requires table rebuild.
229const MIGRATION_V9: &str = "";
230
231/// Ordered list of migrations.
232#[allow(deprecated)]
233const MIGRATIONS: &[(u32, &str)] = &[
234    (1, MIGRATION_V1),
235    (2, MIGRATION_V2),
236    (3, MIGRATION_V3),
237    (4, MIGRATION_V4),
238    (5, MIGRATION_V5),
239    (6, MIGRATION_V6),
240    (7, MIGRATION_V7),
241    (8, MIGRATION_V8),
242    (9, MIGRATION_V9),
243    (10, crate::projection_import::MIGRATION_V10),
244    (11, crate::projection_storage::MIGRATION_V11),
245    (12, crate::projection_storage::MIGRATION_V12),
246    (13, crate::projection_storage::MIGRATION_V13),
247    (14, crate::projection_storage::MIGRATION_V14),
248    (15, crate::projection_storage::MIGRATION_V15),
249    (16, crate::projection_storage::MIGRATION_V16),
250    (17, crate::projection_storage::MIGRATION_V17),
251];
252
253/// Maximum schema version this build supports.
254pub const MAX_SCHEMA_VERSION: u32 = 17;
255
256/// Procedural migration for V9: rebuild episodes table with episode_id PK.
257fn run_migration_v9(conn: &Connection) -> Result<(), MemoryError> {
258    // Check if episodes table exists (fresh DBs won't have it yet at V6)
259    let episodes_exist: bool = conn
260        .query_row(
261            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='episodes'",
262            [],
263            |row| row.get(0),
264        )
265        .map_err(|e| MemoryError::MigrationFailed {
266            version: 9,
267            reason: format!("existence check failed: {e}"),
268        })?;
269
270    if !episodes_exist {
271        // No episodes table to migrate; create the target schema directly
272        conn.execute_batch(
273            "CREATE TABLE IF NOT EXISTS episode_causes (
274                 episode_id    TEXT NOT NULL,
275                 cause_node_id TEXT NOT NULL,
276                 ordinal       INTEGER NOT NULL DEFAULT 0,
277                 PRIMARY KEY (episode_id, cause_node_id)
278             );
279             CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
280        )?;
281        return Ok(());
282    }
283
284    // Disable foreign keys for table rebuild
285    conn.execute_batch("PRAGMA foreign_keys = OFF;")?;
286
287    conn.execute_batch(
288        "CREATE TABLE episodes_new (
289             episode_id  TEXT PRIMARY KEY,
290             document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
291             cause_ids   TEXT NOT NULL,
292             effect_type TEXT NOT NULL,
293             outcome     TEXT NOT NULL DEFAULT 'pending',
294             confidence  REAL NOT NULL DEFAULT 0.0,
295             verification_status TEXT NOT NULL DEFAULT '{\"status\":\"unverified\"}',
296             experiment_id TEXT,
297             created_at  TEXT NOT NULL DEFAULT (datetime('now')),
298             updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
299             search_text TEXT NOT NULL DEFAULT '',
300             embedding   BLOB,
301             embedding_q8 BLOB,
302             trace_id    TEXT
303         )",
304    )?;
305
306    // Migrate existing data with deterministic episode_id
307    conn.execute_batch(
308        "INSERT INTO episodes_new
309             (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
310              verification_status, experiment_id, created_at, updated_at,
311              search_text, embedding, embedding_q8, trace_id)
312         SELECT
313             document_id || '-ep0',
314             document_id, cause_ids, effect_type, outcome, confidence,
315             verification_status, experiment_id, created_at, updated_at,
316             search_text, embedding, embedding_q8, trace_id
317         FROM episodes",
318    )?;
319
320    conn.execute_batch("DROP TABLE episodes")?;
321    conn.execute_batch("ALTER TABLE episodes_new RENAME TO episodes")?;
322
323    conn.execute_batch(
324        "CREATE INDEX idx_episodes_document_id ON episodes(document_id);
325         CREATE INDEX idx_episodes_effect_type ON episodes(effect_type);
326         CREATE INDEX idx_episodes_outcome ON episodes(outcome);
327         CREATE INDEX idx_episodes_experiment_id ON episodes(experiment_id);",
328    )?;
329
330    // Rebuild episodes_rowid_map with episode_id
331    conn.execute_batch(
332        "DROP TABLE IF EXISTS episodes_rowid_map;
333         CREATE TABLE episodes_rowid_map (
334             rowid       INTEGER PRIMARY KEY AUTOINCREMENT,
335             episode_id  TEXT NOT NULL UNIQUE,
336             document_id TEXT
337         );
338         INSERT INTO episodes_rowid_map (episode_id, document_id)
339         SELECT episode_id, document_id FROM episodes;",
340    )?;
341
342    // Rebuild episodes FTS
343    conn.execute_batch(
344        "DROP TABLE IF EXISTS episodes_fts;
345         CREATE VIRTUAL TABLE episodes_fts USING fts5(
346             content,
347             content='',
348             content_rowid='rowid',
349             tokenize='porter unicode61'
350         );
351         INSERT INTO episodes_fts (rowid, content)
352         SELECT rm.rowid, e.search_text
353         FROM episodes_rowid_map rm
354         JOIN episodes e ON e.episode_id = rm.episode_id;",
355    )?;
356
357    // Normalized causal edge table
358    conn.execute_batch(
359        "CREATE TABLE IF NOT EXISTS episode_causes (
360             episode_id    TEXT NOT NULL,
361             cause_node_id TEXT NOT NULL,
362             ordinal       INTEGER NOT NULL DEFAULT 0,
363             PRIMARY KEY (episode_id, cause_node_id)
364         );
365         CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
366    )?;
367
368    // Populate edge table from existing JSON cause_ids
369    conn.execute_batch(
370        "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
371         SELECT e.episode_id, je.value, CAST(je.key AS INTEGER)
372         FROM episodes e, json_each(e.cause_ids) je;",
373    )?;
374
375    conn.execute_batch("PRAGMA foreign_keys = ON;")?;
376
377    Ok(())
378}
379
380/// How thorough the integrity check should be.
381#[derive(Debug, Clone, Copy, PartialEq, Eq)]
382pub enum VerifyMode {
383    /// Quick: counts and basic metadata only.
384    Quick,
385    /// Full: includes FTS, JSON/enum decoding, blobs, and SQLite integrity_check.
386    Full,
387}
388
389/// Result of an integrity verification.
390#[derive(Debug, Clone)]
391pub struct IntegrityReport {
392    pub ok: bool,
393    pub schema_version: u32,
394    pub fact_count: usize,
395    pub chunk_count: usize,
396    pub message_count: usize,
397    pub facts_missing_embeddings: usize,
398    pub chunks_missing_embeddings: usize,
399    pub issues: Vec<String>,
400}
401
402/// Action to take when integrity issues are found.
403#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub enum ReconcileAction {
405    ReportOnly,
406    RebuildFts,
407    ReEmbed,
408}
409
410/// Desired HNSW sidecar mutation queued in SQLite.
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub(crate) enum IndexOpKind {
413    Upsert,
414    Delete,
415}
416
417impl IndexOpKind {
418    pub(crate) fn as_str(self) -> &'static str {
419        match self {
420            Self::Upsert => "upsert",
421            Self::Delete => "delete",
422        }
423    }
424
425    fn parse(raw: &str, item_key: &str) -> Result<Self, MemoryError> {
426        match raw {
427            "upsert" => Ok(Self::Upsert),
428            "delete" => Ok(Self::Delete),
429            other => Err(MemoryError::CorruptData {
430                table: "pending_index_ops",
431                row_id: item_key.to_string(),
432                detail: format!("invalid op_kind '{other}'"),
433            }),
434        }
435    }
436}
437
438/// Durable sidecar repair record.
439#[derive(Debug, Clone)]
440pub(crate) struct PendingIndexOp {
441    pub item_key: String,
442    pub entity_type: String,
443    pub op_kind: IndexOpKind,
444    pub attempt_count: u32,
445    pub last_error: Option<String>,
446}
447
448/// Run a closure inside an unchecked transaction, committing on success.
449pub fn with_transaction<F, T>(conn: &Connection, f: F) -> Result<T, MemoryError>
450where
451    F: FnOnce(&rusqlite::Transaction<'_>) -> Result<T, MemoryError>,
452{
453    let tx = conn.unchecked_transaction()?;
454    let result = f(&tx)?;
455    tx.commit()?;
456    Ok(result)
457}
458
459/// Open or create a SQLite database, configure pragmas, and run migrations.
460pub fn open_database(
461    path: &Path,
462    pool: &PoolConfig,
463    limits: &MemoryLimits,
464) -> Result<Connection, MemoryError> {
465    open_database_internal(path, pool, limits.max_db_size_bytes, true)
466}
467
468/// Open a SQLite connection with pragmas applied but without running migrations.
469pub fn open_database_connection(
470    path: &Path,
471    pool: &PoolConfig,
472    limits: &MemoryLimits,
473) -> Result<Connection, MemoryError> {
474    open_database_internal(path, pool, limits.max_db_size_bytes, false)
475}
476
477pub(crate) fn open_database_internal(
478    path: &Path,
479    pool: &PoolConfig,
480    max_db_size_bytes: u64,
481    run_schema_migrations: bool,
482) -> Result<Connection, MemoryError> {
483    create_parent_dirs(path)?;
484    let conn = Connection::open(path)?;
485    configure_connection(&conn, path, pool, max_db_size_bytes, false)?;
486    if run_schema_migrations {
487        run_migrations(&conn)?;
488    }
489    Ok(conn)
490}
491
492pub(crate) fn open_pool_member_connection(
493    path: &Path,
494    pool: &PoolConfig,
495    limits: &MemoryLimits,
496    query_only: bool,
497) -> Result<Connection, MemoryError> {
498    create_parent_dirs(path)?;
499    let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE;
500    let conn = Connection::open_with_flags(path, flags)?;
501    configure_connection(&conn, path, pool, limits.max_db_size_bytes, query_only)?;
502    Ok(conn)
503}
504
505fn create_parent_dirs(path: &Path) -> Result<(), MemoryError> {
506    if let Some(parent) = path.parent() {
507        if !parent.as_os_str().is_empty() {
508            std::fs::create_dir_all(parent).map_err(|e| {
509                MemoryError::StorageError(format!(
510                    "failed to create database directory {}: {}",
511                    parent.display(),
512                    e
513                ))
514            })?;
515        }
516    }
517    Ok(())
518}
519
520fn configure_connection(
521    conn: &Connection,
522    path: &Path,
523    pool: &PoolConfig,
524    max_db_size_bytes: u64,
525    query_only: bool,
526) -> Result<(), MemoryError> {
527    let journal_mode = if pool.enable_wal { "WAL" } else { "DELETE" };
528    conn.execute_batch(&format!(
529        "PRAGMA journal_mode = {};
530         PRAGMA foreign_keys = ON;
531         PRAGMA busy_timeout = {};
532         PRAGMA synchronous = NORMAL;
533         PRAGMA temp_store = MEMORY;
534         PRAGMA wal_autocheckpoint = {};",
535        journal_mode, pool.busy_timeout_ms, pool.wal_autocheckpoint,
536    ))?;
537
538    if query_only {
539        conn.execute_batch("PRAGMA query_only = ON;")?;
540    }
541
542    let actual_journal_mode: String =
543        conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
544    let expected_journal_mode = if pool.enable_wal { "wal" } else { "delete" };
545    if actual_journal_mode.to_lowercase() != expected_journal_mode {
546        return Err(MemoryError::StorageError(format!(
547            "SQLite journal mode mismatch for {}: requested {}, got {}",
548            path.display(),
549            expected_journal_mode,
550            actual_journal_mode
551        )));
552    }
553
554    if max_db_size_bytes > 0 {
555        let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
556        let max_page_count = max_db_size_bytes.div_ceil(page_size);
557        let actual_max_page_count: u64 = conn.query_row(
558            &format!("PRAGMA max_page_count = {}", max_page_count),
559            [],
560            |row| row.get(0),
561        )?;
562        let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
563
564        if page_count > actual_max_page_count {
565            return Err(MemoryError::DatabaseSizeLimitExceeded {
566                current: page_count.saturating_mul(page_size),
567                limit: max_db_size_bytes,
568            });
569        }
570    }
571
572    Ok(())
573}
574
575/// Run all pending migrations.
576pub fn run_migrations(conn: &Connection) -> Result<(), MemoryError> {
577    let user_version: u32 = conn
578        .query_row("PRAGMA user_version", [], |row| row.get(0))
579        .map_err(|e| MemoryError::MigrationFailed {
580            version: 0,
581            reason: format!("failed to read PRAGMA user_version: {e}"),
582        })?;
583
584    if user_version > MAX_SCHEMA_VERSION {
585        return Err(MemoryError::SchemaAhead {
586            found: user_version,
587            supported: MAX_SCHEMA_VERSION,
588        });
589    }
590
591    conn.execute_batch(
592        "CREATE TABLE IF NOT EXISTS _schema_version (
593            version     INTEGER PRIMARY KEY,
594            applied_at  TEXT NOT NULL DEFAULT (datetime('now'))
595        );",
596    )?;
597
598    for &(version, sql) in MIGRATIONS {
599        let current_version: u32 = conn
600            .query_row(
601                "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
602                [],
603                |row| row.get(0),
604            )
605            .unwrap_or(0);
606
607        if current_version >= version {
608            continue;
609        }
610
611        with_transaction(conn, |tx| {
612            match version {
613                9 => run_migration_v9(tx).map_err(|e| MemoryError::MigrationFailed {
614                    version,
615                    reason: e.to_string(),
616                })?,
617                16 => run_migration_v16(tx).map_err(|e| MemoryError::MigrationFailed {
618                    version,
619                    reason: e.to_string(),
620                })?,
621                17 => run_migration_v17(tx).map_err(|e| MemoryError::MigrationFailed {
622                    version,
623                    reason: e.to_string(),
624                })?,
625                _ => tx
626                    .execute_batch(sql)
627                    .map_err(|e| MemoryError::MigrationFailed {
628                        version,
629                        reason: e.to_string(),
630                    })?,
631            }
632            tx.execute(
633                "INSERT INTO _schema_version (version) VALUES (?1)",
634                params![version],
635            )
636            .map_err(|e| MemoryError::MigrationFailed {
637                version,
638                reason: e.to_string(),
639            })?;
640            Ok(())
641        })?;
642
643        tracing::info!("Applied migration V{}", version);
644    }
645
646    let final_version: u32 = conn
647        .query_row(
648            "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
649            [],
650            |row| row.get(0),
651        )
652        .unwrap_or(0);
653    conn.execute_batch(&format!("PRAGMA user_version = {};", final_version))?;
654
655    Ok(())
656}
657
658fn run_migration_v16(conn: &Connection) -> Result<(), rusqlite::Error> {
659    add_column_if_missing(conn, "projection_import_log", "kernel_payload_json", "TEXT")?;
660    add_column_if_missing(
661        conn,
662        "projection_import_failures",
663        "kernel_payload_json",
664        "TEXT",
665    )?;
666    Ok(())
667}
668
669fn run_migration_v17(conn: &Connection) -> Result<(), rusqlite::Error> {
670    add_column_if_missing(conn, "projection_import_log", "episode_bundle_id", "TEXT")?;
671    add_column_if_missing(conn, "projection_import_log", "episode_bundle_json", "TEXT")?;
672    add_column_if_missing(
673        conn,
674        "projection_import_log",
675        "execution_context_json",
676        "TEXT",
677    )?;
678    add_column_if_missing(
679        conn,
680        "projection_import_failures",
681        "episode_bundle_id",
682        "TEXT",
683    )?;
684    add_column_if_missing(
685        conn,
686        "projection_import_failures",
687        "episode_bundle_json",
688        "TEXT",
689    )?;
690    add_column_if_missing(
691        conn,
692        "projection_import_failures",
693        "execution_context_json",
694        "TEXT",
695    )?;
696    Ok(())
697}
698
699fn add_column_if_missing(
700    conn: &Connection,
701    table: &str,
702    column: &str,
703    column_sql: &str,
704) -> Result<(), rusqlite::Error> {
705    let pragma = format!("PRAGMA table_info({table})");
706    let mut stmt = conn.prepare(&pragma)?;
707    let exists = stmt
708        .query_map([], |row| row.get::<_, String>(1))?
709        .collect::<Result<Vec<_>, _>>()?
710        .into_iter()
711        .any(|name| name == column);
712
713    if !exists {
714        conn.execute(
715            &format!("ALTER TABLE {table} ADD COLUMN {column} {column_sql}"),
716            [],
717        )?;
718    }
719
720    Ok(())
721}
722
723/// Check and update the embedding metadata singleton row.
724pub fn check_embedding_metadata(
725    conn: &Connection,
726    config: &EmbeddingConfig,
727) -> Result<(), MemoryError> {
728    // INTENTIONAL: row absent on first run before metadata is inserted
729    let existing: Option<(String, usize)> = conn
730        .query_row(
731            "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
732            [],
733            |row| Ok((row.get(0)?, row.get(1)?)),
734        )
735        .ok();
736
737    match existing {
738        Some((model, dims)) => {
739            if model != config.model || dims != config.dimensions {
740                tracing::warn!(
741                    stored_model = %model,
742                    stored_dims = dims,
743                    configured_model = %config.model,
744                    configured_dims = config.dimensions,
745                    "Embedding model changed. Existing embeddings are stale."
746                );
747                conn.execute(
748                    "UPDATE embedding_metadata
749                     SET model_name = ?1,
750                         dimensions = ?2,
751                         embeddings_dirty = 1,
752                         updated_at = datetime('now')
753                     WHERE id = 1",
754                    params![config.model, config.dimensions],
755                )?;
756            }
757        }
758        None => {
759            conn.execute(
760                "INSERT INTO embedding_metadata (id, model_name, dimensions) VALUES (1, ?1, ?2)",
761                params![config.model, config.dimensions],
762            )?;
763        }
764    }
765
766    Ok(())
767}
768
769/// Encode an f32 slice as bytes for SQLite BLOB storage.
770pub fn embedding_to_bytes(embedding: &[f32]) -> Vec<u8> {
771    let mut bytes = Vec::with_capacity(embedding.len() * 4);
772    for value in embedding {
773        bytes.extend_from_slice(&value.to_le_bytes());
774    }
775    bytes
776}
777
778/// Decode a SQLite embedding BLOB back to f32 values.
779#[allow(clippy::manual_is_multiple_of)]
780pub fn bytes_to_embedding(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
781    if bytes.len() % 4 != 0 {
782        return Err(MemoryError::InvalidEmbedding {
783            expected_bytes: bytes.len() - (bytes.len() % 4),
784            actual_bytes: bytes.len(),
785        });
786    }
787
788    match bytemuck::try_cast_slice::<u8, f32>(bytes) {
789        Ok(slice) => Ok(slice.to_vec()),
790        Err(_) => {
791            let mut embedding = Vec::with_capacity(bytes.len() / 4);
792            for chunk in bytes.chunks_exact(4) {
793                embedding.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
794            }
795            Ok(embedding)
796        }
797    }
798}
799
800pub fn is_embeddings_dirty(conn: &Connection) -> Result<bool, MemoryError> {
801    let dirty: i32 = conn
802        .query_row(
803            "SELECT COALESCE(embeddings_dirty, 0) FROM embedding_metadata WHERE id = 1",
804            [],
805            |row| row.get(0),
806        )
807        .unwrap_or(0);
808    Ok(dirty != 0)
809}
810
811pub fn clear_embeddings_dirty(conn: &Connection) -> Result<(), MemoryError> {
812    conn.execute(
813        "UPDATE embedding_metadata SET embeddings_dirty = 0 WHERE id = 1",
814        [],
815    )?;
816    Ok(())
817}
818
819#[cfg(feature = "hnsw")]
820pub(crate) fn queue_pending_index_op(
821    tx: &rusqlite::Transaction<'_>,
822    item_key: &str,
823    entity_type: &str,
824    op_kind: IndexOpKind,
825) -> Result<(), MemoryError> {
826    tx.execute(
827        "INSERT INTO pending_index_ops (item_key, entity_type, op_kind, attempt_count, last_error, updated_at)
828         VALUES (?1, ?2, ?3, 0, NULL, datetime('now'))
829         ON CONFLICT(item_key) DO UPDATE SET
830             entity_type = excluded.entity_type,
831             op_kind = excluded.op_kind,
832             attempt_count = 0,
833             last_error = NULL,
834             updated_at = datetime('now')",
835        params![item_key, entity_type, op_kind.as_str()],
836    )?;
837    mark_sidecar_dirty(tx)?;
838    Ok(())
839}
840
841#[cfg(feature = "hnsw")]
842pub(crate) use IndexOpKind as PendingIndexOpKind;
843
844#[cfg(feature = "hnsw")]
845pub(crate) fn enqueue_pending_index_op(
846    tx: &rusqlite::Transaction<'_>,
847    item_key: &str,
848    entity_type: &str,
849    op_kind: PendingIndexOpKind,
850) -> Result<(), MemoryError> {
851    queue_pending_index_op(tx, item_key, entity_type, op_kind)
852}
853
854pub(crate) fn list_pending_index_ops(
855    conn: &Connection,
856) -> Result<Vec<PendingIndexOp>, MemoryError> {
857    let table_exists: bool = conn
858        .query_row(
859            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
860            [],
861            |row| row.get(0),
862        )
863        .unwrap_or(false);
864    if !table_exists {
865        return Ok(Vec::new());
866    }
867
868    let mut stmt = conn.prepare(
869        "SELECT item_key, entity_type, op_kind, attempt_count, last_error
870         FROM pending_index_ops
871         ORDER BY updated_at ASC, item_key ASC",
872    )?;
873    let rows = stmt
874        .query_map([], |row| {
875            let item_key: String = row.get(0)?;
876            let op_kind: String = row.get(2)?;
877            Ok(PendingIndexOp {
878                item_key: item_key.clone(),
879                entity_type: row.get(1)?,
880                op_kind: IndexOpKind::parse(&op_kind, &item_key).map_err(|e| {
881                    rusqlite::Error::FromSqlConversionFailure(
882                        2,
883                        rusqlite::types::Type::Text,
884                        Box::new(e),
885                    )
886                })?,
887                attempt_count: row.get::<_, i64>(3)? as u32,
888                last_error: row.get(4)?,
889            })
890        })?
891        .collect::<Result<Vec<_>, _>>()?;
892    Ok(rows)
893}
894
895#[cfg(feature = "hnsw")]
896pub(crate) fn pending_index_op_count(conn: &Connection) -> Result<usize, MemoryError> {
897    let table_exists: bool = conn
898        .query_row(
899            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
900            [],
901            |row| row.get(0),
902        )
903        .unwrap_or(false);
904    if !table_exists {
905        return Ok(0);
906    }
907
908    let count: i64 = conn.query_row("SELECT COUNT(*) FROM pending_index_ops", [], |row| {
909        row.get(0)
910    })?;
911    Ok(count as usize)
912}
913
914#[cfg(feature = "hnsw")]
915pub(crate) fn mark_pending_index_ops_failed(
916    conn: &Connection,
917    item_keys: &[String],
918    error: &str,
919) -> Result<(), MemoryError> {
920    with_transaction(conn, |tx| {
921        for item_key in item_keys {
922            tx.execute(
923                "UPDATE pending_index_ops
924                 SET attempt_count = attempt_count + 1,
925                     last_error = ?1,
926                     updated_at = datetime('now')
927                 WHERE item_key = ?2",
928                params![error, item_key],
929            )?;
930        }
931        Ok(())
932    })
933}
934
935#[cfg(feature = "hnsw")]
936pub(crate) fn clear_pending_index_ops(
937    conn: &Connection,
938    item_keys: &[String],
939) -> Result<(), MemoryError> {
940    with_transaction(conn, |tx| {
941        for item_key in item_keys {
942            tx.execute(
943                "DELETE FROM pending_index_ops WHERE item_key = ?1",
944                params![item_key],
945            )?;
946        }
947        Ok(())
948    })
949}
950
951#[cfg(feature = "hnsw")]
952pub(crate) fn clear_all_pending_index_ops(conn: &Connection) -> Result<(), MemoryError> {
953    conn.execute("DELETE FROM pending_index_ops", [])?;
954    Ok(())
955}
956
957#[cfg(feature = "hnsw")]
958pub(crate) fn load_embedding_for_index_key(
959    conn: &Connection,
960    item_key: &str,
961) -> Result<Option<Vec<f32>>, MemoryError> {
962    let Some((domain, raw_id)) = item_key.split_once(':') else {
963        return Err(MemoryError::InvalidKey(item_key.to_string()));
964    };
965
966    let blob_result: Result<Option<Vec<u8>>, rusqlite::Error> = match domain {
967        "fact" => conn.query_row(
968            "SELECT embedding FROM facts WHERE id = ?1",
969            params![raw_id],
970            |row| row.get(0),
971        ),
972        "chunk" => conn.query_row(
973            "SELECT embedding FROM chunks WHERE id = ?1",
974            params![raw_id],
975            |row| row.get(0),
976        ),
977        "msg" => {
978            let message_id = raw_id
979                .parse::<i64>()
980                .map_err(|e| MemoryError::InvalidKey(format!("{}: {e}", item_key)))?;
981            conn.query_row(
982                "SELECT embedding FROM messages WHERE id = ?1",
983                params![message_id],
984                |row| row.get(0),
985            )
986        }
987        "episode" => conn.query_row(
988            "SELECT embedding FROM episodes WHERE episode_id = ?1",
989            params![raw_id],
990            |row| row.get(0),
991        ),
992        _ => return Err(MemoryError::InvalidKey(item_key.to_string())),
993    };
994
995    let blob = match blob_result {
996        Ok(blob) => blob,
997        Err(rusqlite::Error::QueryReturnedNoRows) => None,
998        Err(err) => return Err(err.into()),
999    };
1000
1001    blob.map(|bytes| bytes_to_embedding(&bytes)).transpose()
1002}
1003
1004#[cfg(feature = "hnsw")]
1005fn mark_sidecar_dirty(tx: &rusqlite::Transaction<'_>) -> Result<(), MemoryError> {
1006    tx.execute(
1007        "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '1')
1008         ON CONFLICT(key) DO UPDATE SET value = '1'",
1009        [],
1010    )?;
1011    Ok(())
1012}
1013
1014#[cfg(feature = "hnsw")]
1015pub(crate) fn is_sidecar_dirty(conn: &Connection) -> Result<bool, MemoryError> {
1016    // INTENTIONAL: row absent when HNSW metadata has not been written yet
1017    let dirty: Option<String> = conn
1018        .query_row(
1019            "SELECT value FROM hnsw_metadata WHERE key = 'sidecar_dirty'",
1020            [],
1021            |row| row.get(0),
1022        )
1023        .ok();
1024    Ok(matches!(dirty.as_deref(), Some("1")))
1025}
1026
1027#[cfg(feature = "hnsw")]
1028pub(crate) fn set_sidecar_dirty(conn: &Connection, dirty: bool) -> Result<(), MemoryError> {
1029    conn.execute(
1030        "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', ?1)
1031         ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1032        params![if dirty { "1" } else { "0" }],
1033    )?;
1034    Ok(())
1035}
1036
1037pub(crate) fn parse_optional_json(
1038    table: &'static str,
1039    row_id: &str,
1040    field: &'static str,
1041    raw: Option<&str>,
1042) -> Result<Option<serde_json::Value>, MemoryError> {
1043    match raw {
1044        Some(raw) => serde_json::from_str(raw)
1045            .map(Some)
1046            .map_err(|e| MemoryError::CorruptData {
1047                table,
1048                row_id: row_id.to_string(),
1049                detail: format!("invalid {field}: {e}"),
1050            }),
1051        None => Ok(None),
1052    }
1053}
1054
1055pub(crate) fn parse_string_list_json(
1056    table: &'static str,
1057    row_id: &str,
1058    field: &'static str,
1059    raw: &str,
1060) -> Result<Vec<String>, MemoryError> {
1061    serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
1062        table,
1063        row_id: row_id.to_string(),
1064        detail: format!("invalid {field}: {e}"),
1065    })
1066}
1067
1068pub(crate) fn parse_role(
1069    table: &'static str,
1070    row_id: &str,
1071    raw: &str,
1072) -> Result<Role, MemoryError> {
1073    Role::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
1074        table,
1075        row_id: row_id.to_string(),
1076        detail: format!("invalid role '{raw}'"),
1077    })
1078}
1079
1080pub(crate) fn parse_episode_outcome(
1081    row_id: &str,
1082    raw: &str,
1083) -> Result<EpisodeOutcome, MemoryError> {
1084    EpisodeOutcome::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
1085        table: "episodes",
1086        row_id: row_id.to_string(),
1087        detail: format!("invalid outcome '{raw}'"),
1088    })
1089}
1090
1091pub(crate) fn parse_verification_status(
1092    row_id: &str,
1093    raw: &str,
1094) -> Result<VerificationStatus, MemoryError> {
1095    serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
1096        table: "episodes",
1097        row_id: row_id.to_string(),
1098        detail: format!("invalid verification_status: {e}"),
1099    })
1100}
1101
1102/// Run integrity verification on the database.
1103pub fn verify_integrity_sync(
1104    conn: &Connection,
1105    mode: VerifyMode,
1106) -> Result<IntegrityReport, MemoryError> {
1107    let mut issues = Vec::new();
1108
1109    let schema_version: u32 = conn
1110        .query_row("PRAGMA user_version", [], |row| row.get(0))
1111        .unwrap_or_else(|e| {
1112            issues.push(format!("failed to read schema version: {e}"));
1113            0
1114        });
1115    if schema_version > MAX_SCHEMA_VERSION {
1116        issues.push(format!(
1117            "schema version {} is ahead of supported {}",
1118            schema_version, MAX_SCHEMA_VERSION
1119        ));
1120    }
1121
1122    let fact_count: usize = conn
1123        .query_row("SELECT COUNT(*) FROM facts", [], |row| row.get(0))
1124        .unwrap_or_else(|e| {
1125            issues.push(format!("failed to count facts: {e}"));
1126            0
1127        });
1128    let chunk_count: usize = conn
1129        .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
1130        .unwrap_or_else(|e| {
1131            issues.push(format!("failed to count chunks: {e}"));
1132            0
1133        });
1134    let message_count: usize = conn
1135        .query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
1136        .unwrap_or_else(|e| {
1137            issues.push(format!("failed to count messages: {e}"));
1138            0
1139        });
1140    let episode_count: usize = conn
1141        .query_row("SELECT COUNT(*) FROM episodes", [], |row| row.get(0))
1142        .unwrap_or_else(|e| {
1143            issues.push(format!("failed to count episodes: {e}"));
1144            0
1145        });
1146
1147    let facts_missing_embeddings: usize = conn
1148        .query_row(
1149            "SELECT COUNT(*) FROM facts WHERE embedding IS NULL",
1150            [],
1151            |row| row.get(0),
1152        )
1153        .unwrap_or_else(|e| {
1154            issues.push(format!("failed to count facts missing embeddings: {e}"));
1155            0
1156        });
1157    let chunks_missing_embeddings: usize = conn
1158        .query_row(
1159            "SELECT COUNT(*) FROM chunks WHERE embedding IS NULL",
1160            [],
1161            |row| row.get(0),
1162        )
1163        .unwrap_or_else(|e| {
1164            issues.push(format!("failed to count chunks missing embeddings: {e}"));
1165            0
1166        });
1167    let episodes_missing_embeddings: usize = conn
1168        .query_row(
1169            "SELECT COUNT(*) FROM episodes WHERE embedding IS NULL",
1170            [],
1171            |row| row.get(0),
1172        )
1173        .unwrap_or_else(|e| {
1174            issues.push(format!("failed to count episodes missing embeddings: {e}"));
1175            0
1176        });
1177
1178    if facts_missing_embeddings > 0 {
1179        issues.push(format!(
1180            "{} facts missing embeddings",
1181            facts_missing_embeddings
1182        ));
1183    }
1184    if chunks_missing_embeddings > 0 {
1185        issues.push(format!(
1186            "{} chunks missing embeddings",
1187            chunks_missing_embeddings
1188        ));
1189    }
1190    if episodes_missing_embeddings > 0 {
1191        issues.push(format!(
1192            "{} episodes missing embeddings",
1193            episodes_missing_embeddings
1194        ));
1195    }
1196
1197    let pending_ops = list_pending_index_ops(conn).unwrap_or_default();
1198    if !pending_ops.is_empty() {
1199        issues.push(format!(
1200            "{} pending HNSW sidecar ops queued in SQLite",
1201            pending_ops.len()
1202        ));
1203        for op in pending_ops.iter().take(5) {
1204            let op_kind = op.op_kind.as_str();
1205            let detail = match &op.last_error {
1206                Some(last_error) => format!(
1207                    "{} {} {} (attempts: {}, last_error: {})",
1208                    op.entity_type,
1209                    op.op_kind.as_str(),
1210                    op.item_key,
1211                    op.attempt_count,
1212                    last_error
1213                ),
1214                None => format!(
1215                    "{} {} {} (attempts: {})",
1216                    op.entity_type, op_kind, op.item_key, op.attempt_count
1217                ),
1218            };
1219            issues.push(format!("pending sidecar op: {detail}"));
1220        }
1221    }
1222
1223    if mode == VerifyMode::Full {
1224        let dims: usize = conn
1225            .query_row(
1226                "SELECT dimensions FROM embedding_metadata WHERE id = 1",
1227                [],
1228                |row| row.get(0),
1229            )
1230            .unwrap_or_else(|e| {
1231                issues.push(format!("failed to read embedding dimensions: {e}"));
1232                0
1233            });
1234
1235        verify_fts_drift(conn, "facts", "facts_rowid_map", fact_count, &mut issues);
1236        verify_fts_drift(conn, "chunks", "chunks_rowid_map", chunk_count, &mut issues);
1237        verify_fts_drift(
1238            conn,
1239            "messages",
1240            "messages_rowid_map",
1241            message_count,
1242            &mut issues,
1243        );
1244        verify_fts_drift(
1245            conn,
1246            "episodes",
1247            "episodes_rowid_map",
1248            episode_count,
1249            &mut issues,
1250        );
1251
1252        verify_blob_table(conn, "facts", "id", "embedding", dims, &mut issues)?;
1253        verify_blob_table(conn, "chunks", "id", "embedding", dims, &mut issues)?;
1254        verify_blob_table(conn, "messages", "id", "embedding", dims, &mut issues)?;
1255        verify_blob_table(
1256            conn,
1257            "episodes",
1258            "episode_id",
1259            "embedding",
1260            dims,
1261            &mut issues,
1262        )?;
1263
1264        verify_quantized_table(conn, "facts", "id", dims, &mut issues)?;
1265        verify_quantized_table(conn, "chunks", "id", dims, &mut issues)?;
1266        verify_quantized_table(conn, "messages", "id", dims, &mut issues)?;
1267        verify_quantized_table(conn, "episodes", "episode_id", dims, &mut issues)?;
1268
1269        verify_session_rows(conn, &mut issues)?;
1270        verify_message_rows(conn, &mut issues)?;
1271        verify_fact_rows(conn, &mut issues)?;
1272        verify_document_rows(conn, &mut issues)?;
1273        verify_episode_rows(conn, &mut issues)?;
1274
1275        let integrity_check: String = conn
1276            .query_row("PRAGMA integrity_check", [], |row| row.get(0))
1277            .unwrap_or_else(|_| "error".to_string());
1278        if integrity_check != "ok" {
1279            issues.push(format!("SQLite integrity_check: {}", integrity_check));
1280        }
1281    }
1282
1283    Ok(IntegrityReport {
1284        ok: issues.is_empty(),
1285        schema_version,
1286        fact_count,
1287        chunk_count,
1288        message_count,
1289        facts_missing_embeddings,
1290        chunks_missing_embeddings,
1291        issues,
1292    })
1293}
1294
1295/// Reconcile FTS indexes by rebuilding them from source data.
1296pub fn reconcile_fts(conn: &Connection) -> Result<(), MemoryError> {
1297    with_transaction(conn, |tx| {
1298        tx.execute_batch("DROP TABLE IF EXISTS facts_fts")?;
1299        tx.execute_batch("DELETE FROM facts_rowid_map")?;
1300        tx.execute_batch(
1301            "CREATE VIRTUAL TABLE facts_fts USING fts5(
1302                content,
1303                content='',
1304                content_rowid='rowid',
1305                tokenize='porter unicode61'
1306            )",
1307        )?;
1308        tx.execute_batch("INSERT INTO facts_rowid_map (fact_id) SELECT id FROM facts")?;
1309        tx.execute_batch(
1310            "INSERT INTO facts_fts (rowid, content)
1311             SELECT rm.rowid, f.content
1312             FROM facts_rowid_map rm
1313             JOIN facts f ON f.id = rm.fact_id",
1314        )?;
1315
1316        tx.execute_batch("DROP TABLE IF EXISTS chunks_fts")?;
1317        tx.execute_batch("DELETE FROM chunks_rowid_map")?;
1318        tx.execute_batch(
1319            "CREATE VIRTUAL TABLE chunks_fts USING fts5(
1320                content,
1321                content='',
1322                content_rowid='rowid',
1323                tokenize='porter unicode61'
1324            )",
1325        )?;
1326        tx.execute_batch("INSERT INTO chunks_rowid_map (chunk_id) SELECT id FROM chunks")?;
1327        tx.execute_batch(
1328            "INSERT INTO chunks_fts (rowid, content)
1329             SELECT rm.rowid, c.content
1330             FROM chunks_rowid_map rm
1331             JOIN chunks c ON c.id = rm.chunk_id",
1332        )?;
1333
1334        tx.execute_batch("DROP TABLE IF EXISTS messages_fts")?;
1335        tx.execute_batch("DELETE FROM messages_rowid_map")?;
1336        tx.execute_batch(
1337            "CREATE VIRTUAL TABLE messages_fts USING fts5(
1338                content,
1339                content='',
1340                content_rowid='rowid',
1341                tokenize='porter unicode61'
1342            )",
1343        )?;
1344        tx.execute_batch("INSERT INTO messages_rowid_map (message_id) SELECT id FROM messages")?;
1345        tx.execute_batch(
1346            "INSERT INTO messages_fts (rowid, content)
1347             SELECT rm.rowid, m.content
1348             FROM messages_rowid_map rm
1349             JOIN messages m ON m.id = rm.message_id",
1350        )?;
1351
1352        tx.execute_batch("DROP TABLE IF EXISTS episodes_fts")?;
1353        tx.execute_batch("DELETE FROM episodes_rowid_map")?;
1354        tx.execute_batch(
1355            "CREATE VIRTUAL TABLE episodes_fts USING fts5(
1356                content,
1357                content='',
1358                content_rowid='rowid',
1359                tokenize='porter unicode61'
1360            )",
1361        )?;
1362        tx.execute_batch(
1363            "INSERT INTO episodes_rowid_map (episode_id, document_id) SELECT episode_id, document_id FROM episodes",
1364        )?;
1365        tx.execute_batch(
1366            "INSERT INTO episodes_fts (rowid, content)
1367             SELECT rm.rowid, e.search_text
1368             FROM episodes_rowid_map rm
1369             JOIN episodes e ON e.episode_id = rm.episode_id",
1370        )?;
1371
1372        Ok(())
1373    })?;
1374
1375    tracing::info!("FTS indexes reconciled");
1376    Ok(())
1377}
1378
1379fn verify_fts_drift(
1380    conn: &Connection,
1381    label: &str,
1382    map_table: &str,
1383    source_count: usize,
1384    issues: &mut Vec<String>,
1385) {
1386    let table_exists: bool = conn
1387        .query_row(
1388            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name = ?1",
1389            params![map_table],
1390            |row| row.get(0),
1391        )
1392        .unwrap_or(false);
1393    if !table_exists {
1394        if source_count > 0 {
1395            issues.push(format!("{} rows exist but {} is missing", label, map_table));
1396        }
1397        return;
1398    }
1399
1400    let sql = format!("SELECT COUNT(*) FROM {}", map_table);
1401    let indexed_count: usize = conn.query_row(&sql, [], |row| row.get(0)).unwrap_or(0);
1402    if indexed_count != source_count {
1403        issues.push(format!(
1404            "FTS {} index drift: {} rows in map vs {} source rows",
1405            label, indexed_count, source_count
1406        ));
1407    }
1408}
1409
1410fn verify_blob_table(
1411    conn: &Connection,
1412    table: &'static str,
1413    id_column: &'static str,
1414    blob_column: &'static str,
1415    expected_dims: usize,
1416    issues: &mut Vec<String>,
1417) -> Result<(), MemoryError> {
1418    if expected_dims == 0 {
1419        return Ok(());
1420    }
1421
1422    let sql = format!(
1423        "SELECT CAST({id_column} AS TEXT), {blob_column} FROM {table} WHERE {blob_column} IS NOT NULL"
1424    );
1425    let mut stmt = conn.prepare(&sql)?;
1426    let rows = stmt.query_map([], |row| {
1427        Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1428    })?;
1429
1430    for row in rows {
1431        let (row_id, blob) = row?;
1432        match bytes_to_embedding(&blob) {
1433            Ok(embedding) if embedding.len() != expected_dims => issues.push(format!(
1434                "{}({}) has embedding dimension {} but expected {}",
1435                table,
1436                row_id,
1437                embedding.len(),
1438                expected_dims
1439            )),
1440            Ok(_) => {}
1441            Err(err) => issues.push(format!(
1442                "{}({}) invalid embedding blob: {}",
1443                table, row_id, err
1444            )),
1445        }
1446    }
1447
1448    Ok(())
1449}
1450
1451fn verify_quantized_table(
1452    conn: &Connection,
1453    table: &'static str,
1454    id_column: &'static str,
1455    expected_dims: usize,
1456    issues: &mut Vec<String>,
1457) -> Result<(), MemoryError> {
1458    if expected_dims == 0 {
1459        return Ok(());
1460    }
1461
1462    let sql = format!(
1463        "SELECT CAST({id_column} AS TEXT), embedding_q8 FROM {table} WHERE embedding IS NOT NULL"
1464    );
1465    let mut stmt = conn.prepare(&sql)?;
1466    let rows = stmt.query_map([], |row| {
1467        Ok((row.get::<_, String>(0)?, row.get::<_, Option<Vec<u8>>>(1)?))
1468    })?;
1469
1470    for row in rows {
1471        let (row_id, blob) = row?;
1472        match blob {
1473            Some(blob) => {
1474                if let Err(err) = unpack_quantized(&blob, expected_dims) {
1475                    issues.push(format!(
1476                        "{}({}) invalid quantized embedding: {}",
1477                        table, row_id, err
1478                    ));
1479                }
1480            }
1481            None => issues.push(format!("{}({}) missing quantized embedding", table, row_id)),
1482        }
1483    }
1484
1485    Ok(())
1486}
1487
1488fn verify_session_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1489    let mut stmt = conn.prepare("SELECT id, metadata FROM sessions WHERE metadata IS NOT NULL")?;
1490    let rows = stmt.query_map([], |row| {
1491        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1492    })?;
1493    for row in rows {
1494        let (id, metadata) = row?;
1495        if let Err(err) = parse_optional_json("sessions", &id, "metadata", Some(&metadata)) {
1496            issues.push(err.to_string());
1497        }
1498    }
1499    Ok(())
1500}
1501
1502fn verify_message_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1503    let mut stmt = conn.prepare("SELECT id, role, metadata FROM messages")?;
1504    let rows = stmt.query_map([], |row| {
1505        Ok((
1506            row.get::<_, i64>(0)?,
1507            row.get::<_, String>(1)?,
1508            row.get::<_, Option<String>>(2)?,
1509        ))
1510    })?;
1511    for row in rows {
1512        let (id, role, metadata) = row?;
1513        let row_id = id.to_string();
1514        if let Err(err) = parse_role("messages", &row_id, &role) {
1515            issues.push(err.to_string());
1516        }
1517        if let Err(err) = parse_optional_json("messages", &row_id, "metadata", metadata.as_deref())
1518        {
1519            issues.push(err.to_string());
1520        }
1521    }
1522    Ok(())
1523}
1524
1525fn verify_fact_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1526    let mut stmt = conn.prepare("SELECT id, metadata FROM facts WHERE metadata IS NOT NULL")?;
1527    let rows = stmt.query_map([], |row| {
1528        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1529    })?;
1530    for row in rows {
1531        let (id, metadata) = row?;
1532        if let Err(err) = parse_optional_json("facts", &id, "metadata", Some(&metadata)) {
1533            issues.push(err.to_string());
1534        }
1535    }
1536    Ok(())
1537}
1538
1539fn verify_document_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1540    let mut stmt = conn.prepare("SELECT id, metadata FROM documents WHERE metadata IS NOT NULL")?;
1541    let rows = stmt.query_map([], |row| {
1542        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1543    })?;
1544    for row in rows {
1545        let (id, metadata) = row?;
1546        if let Err(err) = parse_optional_json("documents", &id, "metadata", Some(&metadata)) {
1547            issues.push(err.to_string());
1548        }
1549    }
1550    Ok(())
1551}
1552
1553fn verify_episode_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1554    let mut stmt = conn.prepare(
1555        "SELECT episode_id, cause_ids, outcome, verification_status
1556         FROM episodes",
1557    )?;
1558    let rows = stmt.query_map([], |row| {
1559        Ok((
1560            row.get::<_, String>(0)?,
1561            row.get::<_, String>(1)?,
1562            row.get::<_, String>(2)?,
1563            row.get::<_, String>(3)?,
1564        ))
1565    })?;
1566    for row in rows {
1567        let (episode_id, cause_ids, outcome, verification_status) = row?;
1568        if let Err(err) = parse_string_list_json("episodes", &episode_id, "cause_ids", &cause_ids) {
1569            issues.push(err.to_string());
1570        }
1571        if let Err(err) = parse_episode_outcome(&episode_id, &outcome) {
1572            issues.push(err.to_string());
1573        }
1574        if let Err(err) = parse_verification_status(&episode_id, &verification_status) {
1575            issues.push(err.to_string());
1576        }
1577    }
1578    Ok(())
1579}