1use crate::config::{EmbeddingConfig, MemoryLimits, PoolConfig};
4use crate::error::MemoryError;
5use crate::quantize::unpack_quantized;
6use crate::types::{EpisodeOutcome, Role, VerificationStatus};
7use rusqlite::{params, Connection, OpenFlags};
8use std::path::Path;
9
10const MIGRATION_V1: &str = r#"
12-- CONVERSATIONS
13CREATE TABLE sessions (
14 id TEXT PRIMARY KEY,
15 channel TEXT NOT NULL DEFAULT 'repl',
16 created_at TEXT NOT NULL DEFAULT (datetime('now')),
17 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
18 metadata TEXT
19);
20
21CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC);
22
23CREATE TABLE messages (
24 id INTEGER PRIMARY KEY AUTOINCREMENT,
25 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
26 role TEXT NOT NULL CHECK (role IN ('system', 'user', 'assistant', 'tool')),
27 content TEXT NOT NULL,
28 token_count INTEGER,
29 created_at TEXT NOT NULL DEFAULT (datetime('now')),
30 metadata TEXT
31);
32
33CREATE INDEX idx_messages_session ON messages(session_id, created_at ASC);
34CREATE INDEX idx_messages_created ON messages(created_at DESC);
35
36-- KNOWLEDGE (Facts)
37CREATE TABLE facts (
38 id TEXT PRIMARY KEY,
39 namespace TEXT NOT NULL DEFAULT 'general',
40 content TEXT NOT NULL,
41 source TEXT,
42 embedding BLOB,
43 created_at TEXT NOT NULL DEFAULT (datetime('now')),
44 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
45 metadata TEXT
46);
47
48CREATE INDEX idx_facts_namespace ON facts(namespace);
49CREATE INDEX idx_facts_updated ON facts(updated_at DESC);
50
51CREATE TABLE facts_rowid_map (
52 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
53 fact_id TEXT NOT NULL UNIQUE REFERENCES facts(id) ON DELETE CASCADE
54);
55
56CREATE VIRTUAL TABLE facts_fts USING fts5(
57 content,
58 content='',
59 content_rowid='rowid',
60 tokenize='porter unicode61'
61);
62
63-- DOCUMENTS (Chunked content)
64CREATE TABLE documents (
65 id TEXT PRIMARY KEY,
66 title TEXT NOT NULL,
67 source_path TEXT,
68 namespace TEXT NOT NULL DEFAULT 'general',
69 created_at TEXT NOT NULL DEFAULT (datetime('now')),
70 metadata TEXT
71);
72
73CREATE TABLE chunks (
74 id TEXT PRIMARY KEY,
75 document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
76 chunk_index INTEGER NOT NULL,
77 content TEXT NOT NULL,
78 token_count INTEGER,
79 embedding BLOB,
80 created_at TEXT NOT NULL DEFAULT (datetime('now'))
81);
82
83CREATE INDEX idx_chunks_document ON chunks(document_id, chunk_index ASC);
84
85CREATE TABLE chunks_rowid_map (
86 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
87 chunk_id TEXT NOT NULL UNIQUE REFERENCES chunks(id) ON DELETE CASCADE
88);
89
90CREATE VIRTUAL TABLE chunks_fts USING fts5(
91 content,
92 content='',
93 content_rowid='rowid',
94 tokenize='porter unicode61'
95);
96
97-- EMBEDDING METADATA
98CREATE TABLE embedding_metadata (
99 id INTEGER PRIMARY KEY CHECK (id = 1),
100 model_name TEXT NOT NULL,
101 dimensions INTEGER NOT NULL,
102 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
103);
104"#;
105
106const MIGRATION_V2: &str = r#"
108ALTER TABLE messages ADD COLUMN embedding BLOB;
109
110CREATE TABLE messages_rowid_map (
111 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
112 message_id INTEGER NOT NULL UNIQUE REFERENCES messages(id) ON DELETE CASCADE
113);
114
115CREATE VIRTUAL TABLE messages_fts USING fts5(
116 content,
117 content='',
118 content_rowid='rowid',
119 tokenize='porter unicode61'
120);
121"#;
122
123const MIGRATION_V3: &str = r#"
125ALTER TABLE embedding_metadata ADD COLUMN embeddings_dirty INTEGER NOT NULL DEFAULT 0;
126"#;
127
128const MIGRATION_V4: &str = r#"
130CREATE TABLE IF NOT EXISTS hnsw_metadata (
131 key TEXT PRIMARY KEY,
132 value TEXT NOT NULL
133);
134"#;
135
136const MIGRATION_V5: &str = r#"
138ALTER TABLE facts ADD COLUMN embedding_q8 BLOB;
139ALTER TABLE chunks ADD COLUMN embedding_q8 BLOB;
140ALTER TABLE messages ADD COLUMN embedding_q8 BLOB;
141
142CREATE TABLE IF NOT EXISTS hnsw_keymap (
143 node_id INTEGER PRIMARY KEY,
144 item_key TEXT NOT NULL UNIQUE,
145 deleted INTEGER NOT NULL DEFAULT 0
146);
147
148CREATE INDEX idx_hnsw_keymap_key ON hnsw_keymap(item_key);
149"#;
150
151const MIGRATION_V6: &str = r#"
153CREATE TABLE IF NOT EXISTS episodes (
154 document_id TEXT PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
155 cause_ids TEXT NOT NULL,
156 effect_type TEXT NOT NULL,
157 outcome TEXT NOT NULL DEFAULT 'pending',
158 confidence REAL NOT NULL DEFAULT 0.0,
159 verification_status TEXT NOT NULL DEFAULT '{"status":"unverified"}',
160 experiment_id TEXT,
161 created_at TEXT NOT NULL DEFAULT (datetime('now'))
162);
163
164CREATE INDEX IF NOT EXISTS idx_episodes_effect_type ON episodes(effect_type);
165CREATE INDEX IF NOT EXISTS idx_episodes_outcome ON episodes(outcome);
166CREATE INDEX IF NOT EXISTS idx_episodes_experiment_id ON episodes(experiment_id);
167"#;
168
169const MIGRATION_V7: &str = r#"
171ALTER TABLE episodes ADD COLUMN updated_at TEXT NOT NULL DEFAULT (datetime('now'));
172ALTER TABLE episodes ADD COLUMN search_text TEXT NOT NULL DEFAULT '';
173ALTER TABLE episodes ADD COLUMN embedding BLOB;
174ALTER TABLE episodes ADD COLUMN embedding_q8 BLOB;
175
176CREATE TABLE IF NOT EXISTS episodes_rowid_map (
177 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
178 document_id TEXT NOT NULL UNIQUE REFERENCES episodes(document_id) ON DELETE CASCADE
179);
180
181CREATE VIRTUAL TABLE episodes_fts USING fts5(
182 content,
183 content='',
184 content_rowid='rowid',
185 tokenize='porter unicode61'
186);
187
188CREATE TABLE IF NOT EXISTS pending_index_ops (
189 item_key TEXT PRIMARY KEY,
190 entity_type TEXT NOT NULL,
191 op_kind TEXT NOT NULL CHECK (op_kind IN ('upsert', 'delete')),
192 attempt_count INTEGER NOT NULL DEFAULT 0,
193 last_error TEXT,
194 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
195);
196
197INSERT OR IGNORE INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '0');
198
199UPDATE episodes
200SET search_text = TRIM(
201 COALESCE(effect_type, '') || ' ' ||
202 COALESCE(outcome, '') || ' ' ||
203 COALESCE(experiment_id, '') || ' ' ||
204 COALESCE(cause_ids, '')
205)
206WHERE search_text = '';
207
208INSERT OR IGNORE INTO episodes_rowid_map (document_id)
209SELECT document_id FROM episodes;
210
211INSERT INTO episodes_fts (rowid, content)
212SELECT rm.rowid, e.search_text
213FROM episodes_rowid_map rm
214JOIN episodes e ON e.document_id = rm.document_id;
215"#;
216
217const MIGRATION_V8: &str = r#"
219ALTER TABLE episodes ADD COLUMN trace_id TEXT;
220"#;
221
222const MIGRATION_V9: &str = "";
230
231#[allow(deprecated)]
233const MIGRATIONS: &[(u32, &str)] = &[
234 (1, MIGRATION_V1),
235 (2, MIGRATION_V2),
236 (3, MIGRATION_V3),
237 (4, MIGRATION_V4),
238 (5, MIGRATION_V5),
239 (6, MIGRATION_V6),
240 (7, MIGRATION_V7),
241 (8, MIGRATION_V8),
242 (9, MIGRATION_V9),
243 (10, crate::projection_import::MIGRATION_V10),
244 (11, crate::projection_storage::MIGRATION_V11),
245 (12, crate::projection_storage::MIGRATION_V12),
246 (13, crate::projection_storage::MIGRATION_V13),
247 (14, crate::projection_storage::MIGRATION_V14),
248 (15, crate::projection_storage::MIGRATION_V15),
249 (16, crate::projection_storage::MIGRATION_V16),
250 (17, crate::projection_storage::MIGRATION_V17),
251];
252
253pub const MAX_SCHEMA_VERSION: u32 = 17;
255
256fn run_migration_v9(conn: &Connection) -> Result<(), MemoryError> {
258 let episodes_exist: bool = conn
260 .query_row(
261 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='episodes'",
262 [],
263 |row| row.get(0),
264 )
265 .map_err(|e| MemoryError::MigrationFailed {
266 version: 9,
267 reason: format!("existence check failed: {e}"),
268 })?;
269
270 if !episodes_exist {
271 conn.execute_batch(
273 "CREATE TABLE IF NOT EXISTS episode_causes (
274 episode_id TEXT NOT NULL,
275 cause_node_id TEXT NOT NULL,
276 ordinal INTEGER NOT NULL DEFAULT 0,
277 PRIMARY KEY (episode_id, cause_node_id)
278 );
279 CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
280 )?;
281 return Ok(());
282 }
283
284 conn.execute_batch("PRAGMA foreign_keys = OFF;")?;
286
287 conn.execute_batch(
288 "CREATE TABLE episodes_new (
289 episode_id TEXT PRIMARY KEY,
290 document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
291 cause_ids TEXT NOT NULL,
292 effect_type TEXT NOT NULL,
293 outcome TEXT NOT NULL DEFAULT 'pending',
294 confidence REAL NOT NULL DEFAULT 0.0,
295 verification_status TEXT NOT NULL DEFAULT '{\"status\":\"unverified\"}',
296 experiment_id TEXT,
297 created_at TEXT NOT NULL DEFAULT (datetime('now')),
298 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
299 search_text TEXT NOT NULL DEFAULT '',
300 embedding BLOB,
301 embedding_q8 BLOB,
302 trace_id TEXT
303 )",
304 )?;
305
306 conn.execute_batch(
308 "INSERT INTO episodes_new
309 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
310 verification_status, experiment_id, created_at, updated_at,
311 search_text, embedding, embedding_q8, trace_id)
312 SELECT
313 document_id || '-ep0',
314 document_id, cause_ids, effect_type, outcome, confidence,
315 verification_status, experiment_id, created_at, updated_at,
316 search_text, embedding, embedding_q8, trace_id
317 FROM episodes",
318 )?;
319
320 conn.execute_batch("DROP TABLE episodes")?;
321 conn.execute_batch("ALTER TABLE episodes_new RENAME TO episodes")?;
322
323 conn.execute_batch(
324 "CREATE INDEX idx_episodes_document_id ON episodes(document_id);
325 CREATE INDEX idx_episodes_effect_type ON episodes(effect_type);
326 CREATE INDEX idx_episodes_outcome ON episodes(outcome);
327 CREATE INDEX idx_episodes_experiment_id ON episodes(experiment_id);",
328 )?;
329
330 conn.execute_batch(
332 "DROP TABLE IF EXISTS episodes_rowid_map;
333 CREATE TABLE episodes_rowid_map (
334 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
335 episode_id TEXT NOT NULL UNIQUE,
336 document_id TEXT
337 );
338 INSERT INTO episodes_rowid_map (episode_id, document_id)
339 SELECT episode_id, document_id FROM episodes;",
340 )?;
341
342 conn.execute_batch(
344 "DROP TABLE IF EXISTS episodes_fts;
345 CREATE VIRTUAL TABLE episodes_fts USING fts5(
346 content,
347 content='',
348 content_rowid='rowid',
349 tokenize='porter unicode61'
350 );
351 INSERT INTO episodes_fts (rowid, content)
352 SELECT rm.rowid, e.search_text
353 FROM episodes_rowid_map rm
354 JOIN episodes e ON e.episode_id = rm.episode_id;",
355 )?;
356
357 conn.execute_batch(
359 "CREATE TABLE IF NOT EXISTS episode_causes (
360 episode_id TEXT NOT NULL,
361 cause_node_id TEXT NOT NULL,
362 ordinal INTEGER NOT NULL DEFAULT 0,
363 PRIMARY KEY (episode_id, cause_node_id)
364 );
365 CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
366 )?;
367
368 conn.execute_batch(
370 "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
371 SELECT e.episode_id, je.value, CAST(je.key AS INTEGER)
372 FROM episodes e, json_each(e.cause_ids) je;",
373 )?;
374
375 conn.execute_batch("PRAGMA foreign_keys = ON;")?;
376
377 Ok(())
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
382pub enum VerifyMode {
383 Quick,
385 Full,
387}
388
389#[derive(Debug, Clone)]
391pub struct IntegrityReport {
392 pub ok: bool,
393 pub schema_version: u32,
394 pub fact_count: usize,
395 pub chunk_count: usize,
396 pub message_count: usize,
397 pub facts_missing_embeddings: usize,
398 pub chunks_missing_embeddings: usize,
399 pub issues: Vec<String>,
400}
401
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub enum ReconcileAction {
405 ReportOnly,
406 RebuildFts,
407 ReEmbed,
408}
409
410#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub(crate) enum IndexOpKind {
413 Upsert,
414 Delete,
415}
416
417impl IndexOpKind {
418 pub(crate) fn as_str(self) -> &'static str {
419 match self {
420 Self::Upsert => "upsert",
421 Self::Delete => "delete",
422 }
423 }
424
425 fn parse(raw: &str, item_key: &str) -> Result<Self, MemoryError> {
426 match raw {
427 "upsert" => Ok(Self::Upsert),
428 "delete" => Ok(Self::Delete),
429 other => Err(MemoryError::CorruptData {
430 table: "pending_index_ops",
431 row_id: item_key.to_string(),
432 detail: format!("invalid op_kind '{other}'"),
433 }),
434 }
435 }
436}
437
438#[derive(Debug, Clone)]
440pub(crate) struct PendingIndexOp {
441 pub item_key: String,
442 pub entity_type: String,
443 pub op_kind: IndexOpKind,
444 pub attempt_count: u32,
445 pub last_error: Option<String>,
446}
447
448pub fn with_transaction<F, T>(conn: &Connection, f: F) -> Result<T, MemoryError>
450where
451 F: FnOnce(&rusqlite::Transaction<'_>) -> Result<T, MemoryError>,
452{
453 let tx = conn.unchecked_transaction()?;
454 let result = f(&tx)?;
455 tx.commit()?;
456 Ok(result)
457}
458
459pub fn open_database(
461 path: &Path,
462 pool: &PoolConfig,
463 limits: &MemoryLimits,
464) -> Result<Connection, MemoryError> {
465 open_database_internal(path, pool, limits.max_db_size_bytes, true)
466}
467
468pub fn open_database_connection(
470 path: &Path,
471 pool: &PoolConfig,
472 limits: &MemoryLimits,
473) -> Result<Connection, MemoryError> {
474 open_database_internal(path, pool, limits.max_db_size_bytes, false)
475}
476
477pub(crate) fn open_database_internal(
478 path: &Path,
479 pool: &PoolConfig,
480 max_db_size_bytes: u64,
481 run_schema_migrations: bool,
482) -> Result<Connection, MemoryError> {
483 create_parent_dirs(path)?;
484 let conn = Connection::open(path)?;
485 configure_connection(&conn, path, pool, max_db_size_bytes, false)?;
486 if run_schema_migrations {
487 run_migrations(&conn)?;
488 }
489 Ok(conn)
490}
491
492pub(crate) fn open_pool_member_connection(
493 path: &Path,
494 pool: &PoolConfig,
495 limits: &MemoryLimits,
496 query_only: bool,
497) -> Result<Connection, MemoryError> {
498 create_parent_dirs(path)?;
499 let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE;
500 let conn = Connection::open_with_flags(path, flags)?;
501 configure_connection(&conn, path, pool, limits.max_db_size_bytes, query_only)?;
502 Ok(conn)
503}
504
505fn create_parent_dirs(path: &Path) -> Result<(), MemoryError> {
506 if let Some(parent) = path.parent() {
507 if !parent.as_os_str().is_empty() {
508 std::fs::create_dir_all(parent).map_err(|e| {
509 MemoryError::StorageError(format!(
510 "failed to create database directory {}: {}",
511 parent.display(),
512 e
513 ))
514 })?;
515 }
516 }
517 Ok(())
518}
519
520fn configure_connection(
521 conn: &Connection,
522 path: &Path,
523 pool: &PoolConfig,
524 max_db_size_bytes: u64,
525 query_only: bool,
526) -> Result<(), MemoryError> {
527 let journal_mode = if pool.enable_wal { "WAL" } else { "DELETE" };
528 conn.execute_batch(&format!(
529 "PRAGMA journal_mode = {};
530 PRAGMA foreign_keys = ON;
531 PRAGMA busy_timeout = {};
532 PRAGMA synchronous = NORMAL;
533 PRAGMA temp_store = MEMORY;
534 PRAGMA wal_autocheckpoint = {};",
535 journal_mode, pool.busy_timeout_ms, pool.wal_autocheckpoint,
536 ))?;
537
538 if query_only {
539 conn.execute_batch("PRAGMA query_only = ON;")?;
540 }
541
542 let actual_journal_mode: String =
543 conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
544 let expected_journal_mode = if pool.enable_wal { "wal" } else { "delete" };
545 if actual_journal_mode.to_lowercase() != expected_journal_mode {
546 return Err(MemoryError::StorageError(format!(
547 "SQLite journal mode mismatch for {}: requested {}, got {}",
548 path.display(),
549 expected_journal_mode,
550 actual_journal_mode
551 )));
552 }
553
554 if max_db_size_bytes > 0 {
555 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
556 let max_page_count = max_db_size_bytes.div_ceil(page_size);
557 let actual_max_page_count: u64 = conn.query_row(
558 &format!("PRAGMA max_page_count = {}", max_page_count),
559 [],
560 |row| row.get(0),
561 )?;
562 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
563
564 if page_count > actual_max_page_count {
565 return Err(MemoryError::DatabaseSizeLimitExceeded {
566 current: page_count.saturating_mul(page_size),
567 limit: max_db_size_bytes,
568 });
569 }
570 }
571
572 Ok(())
573}
574
575pub fn run_migrations(conn: &Connection) -> Result<(), MemoryError> {
577 let user_version: u32 = conn
578 .query_row("PRAGMA user_version", [], |row| row.get(0))
579 .map_err(|e| MemoryError::MigrationFailed {
580 version: 0,
581 reason: format!("failed to read PRAGMA user_version: {e}"),
582 })?;
583
584 if user_version > MAX_SCHEMA_VERSION {
585 return Err(MemoryError::SchemaAhead {
586 found: user_version,
587 supported: MAX_SCHEMA_VERSION,
588 });
589 }
590
591 conn.execute_batch(
592 "CREATE TABLE IF NOT EXISTS _schema_version (
593 version INTEGER PRIMARY KEY,
594 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
595 );",
596 )?;
597
598 for &(version, sql) in MIGRATIONS {
599 let current_version: u32 = conn
600 .query_row(
601 "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
602 [],
603 |row| row.get(0),
604 )
605 .unwrap_or(0);
606
607 if current_version >= version {
608 continue;
609 }
610
611 with_transaction(conn, |tx| {
612 match version {
613 9 => run_migration_v9(tx).map_err(|e| MemoryError::MigrationFailed {
614 version,
615 reason: e.to_string(),
616 })?,
617 16 => run_migration_v16(tx).map_err(|e| MemoryError::MigrationFailed {
618 version,
619 reason: e.to_string(),
620 })?,
621 17 => run_migration_v17(tx).map_err(|e| MemoryError::MigrationFailed {
622 version,
623 reason: e.to_string(),
624 })?,
625 _ => tx
626 .execute_batch(sql)
627 .map_err(|e| MemoryError::MigrationFailed {
628 version,
629 reason: e.to_string(),
630 })?,
631 }
632 tx.execute(
633 "INSERT INTO _schema_version (version) VALUES (?1)",
634 params![version],
635 )
636 .map_err(|e| MemoryError::MigrationFailed {
637 version,
638 reason: e.to_string(),
639 })?;
640 Ok(())
641 })?;
642
643 tracing::info!("Applied migration V{}", version);
644 }
645
646 let final_version: u32 = conn
647 .query_row(
648 "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
649 [],
650 |row| row.get(0),
651 )
652 .unwrap_or(0);
653 conn.execute_batch(&format!("PRAGMA user_version = {};", final_version))?;
654
655 Ok(())
656}
657
658fn run_migration_v16(conn: &Connection) -> Result<(), rusqlite::Error> {
659 add_column_if_missing(conn, "projection_import_log", "kernel_payload_json", "TEXT")?;
660 add_column_if_missing(
661 conn,
662 "projection_import_failures",
663 "kernel_payload_json",
664 "TEXT",
665 )?;
666 Ok(())
667}
668
669fn run_migration_v17(conn: &Connection) -> Result<(), rusqlite::Error> {
670 add_column_if_missing(conn, "projection_import_log", "episode_bundle_id", "TEXT")?;
671 add_column_if_missing(conn, "projection_import_log", "episode_bundle_json", "TEXT")?;
672 add_column_if_missing(
673 conn,
674 "projection_import_log",
675 "execution_context_json",
676 "TEXT",
677 )?;
678 add_column_if_missing(
679 conn,
680 "projection_import_failures",
681 "episode_bundle_id",
682 "TEXT",
683 )?;
684 add_column_if_missing(
685 conn,
686 "projection_import_failures",
687 "episode_bundle_json",
688 "TEXT",
689 )?;
690 add_column_if_missing(
691 conn,
692 "projection_import_failures",
693 "execution_context_json",
694 "TEXT",
695 )?;
696 Ok(())
697}
698
699fn add_column_if_missing(
700 conn: &Connection,
701 table: &str,
702 column: &str,
703 column_sql: &str,
704) -> Result<(), rusqlite::Error> {
705 let pragma = format!("PRAGMA table_info({table})");
706 let mut stmt = conn.prepare(&pragma)?;
707 let exists = stmt
708 .query_map([], |row| row.get::<_, String>(1))?
709 .collect::<Result<Vec<_>, _>>()?
710 .into_iter()
711 .any(|name| name == column);
712
713 if !exists {
714 conn.execute(
715 &format!("ALTER TABLE {table} ADD COLUMN {column} {column_sql}"),
716 [],
717 )?;
718 }
719
720 Ok(())
721}
722
723pub fn check_embedding_metadata(
725 conn: &Connection,
726 config: &EmbeddingConfig,
727) -> Result<(), MemoryError> {
728 let existing: Option<(String, usize)> = conn
730 .query_row(
731 "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
732 [],
733 |row| Ok((row.get(0)?, row.get(1)?)),
734 )
735 .ok();
736
737 match existing {
738 Some((model, dims)) => {
739 if model != config.model || dims != config.dimensions {
740 tracing::warn!(
741 stored_model = %model,
742 stored_dims = dims,
743 configured_model = %config.model,
744 configured_dims = config.dimensions,
745 "Embedding model changed. Existing embeddings are stale."
746 );
747 conn.execute(
748 "UPDATE embedding_metadata
749 SET model_name = ?1,
750 dimensions = ?2,
751 embeddings_dirty = 1,
752 updated_at = datetime('now')
753 WHERE id = 1",
754 params![config.model, config.dimensions],
755 )?;
756 }
757 }
758 None => {
759 conn.execute(
760 "INSERT INTO embedding_metadata (id, model_name, dimensions) VALUES (1, ?1, ?2)",
761 params![config.model, config.dimensions],
762 )?;
763 }
764 }
765
766 Ok(())
767}
768
769pub fn embedding_to_bytes(embedding: &[f32]) -> Vec<u8> {
771 let mut bytes = Vec::with_capacity(embedding.len() * 4);
772 for value in embedding {
773 bytes.extend_from_slice(&value.to_le_bytes());
774 }
775 bytes
776}
777
778#[allow(clippy::manual_is_multiple_of)]
780pub fn bytes_to_embedding(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
781 if bytes.len() % 4 != 0 {
782 return Err(MemoryError::InvalidEmbedding {
783 expected_bytes: bytes.len() - (bytes.len() % 4),
784 actual_bytes: bytes.len(),
785 });
786 }
787
788 match bytemuck::try_cast_slice::<u8, f32>(bytes) {
789 Ok(slice) => Ok(slice.to_vec()),
790 Err(_) => {
791 let mut embedding = Vec::with_capacity(bytes.len() / 4);
792 for chunk in bytes.chunks_exact(4) {
793 embedding.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
794 }
795 Ok(embedding)
796 }
797 }
798}
799
800pub fn is_embeddings_dirty(conn: &Connection) -> Result<bool, MemoryError> {
801 let dirty: i32 = conn
802 .query_row(
803 "SELECT COALESCE(embeddings_dirty, 0) FROM embedding_metadata WHERE id = 1",
804 [],
805 |row| row.get(0),
806 )
807 .unwrap_or(0);
808 Ok(dirty != 0)
809}
810
811pub fn clear_embeddings_dirty(conn: &Connection) -> Result<(), MemoryError> {
812 conn.execute(
813 "UPDATE embedding_metadata SET embeddings_dirty = 0 WHERE id = 1",
814 [],
815 )?;
816 Ok(())
817}
818
819#[cfg(feature = "hnsw")]
820pub(crate) fn queue_pending_index_op(
821 tx: &rusqlite::Transaction<'_>,
822 item_key: &str,
823 entity_type: &str,
824 op_kind: IndexOpKind,
825) -> Result<(), MemoryError> {
826 tx.execute(
827 "INSERT INTO pending_index_ops (item_key, entity_type, op_kind, attempt_count, last_error, updated_at)
828 VALUES (?1, ?2, ?3, 0, NULL, datetime('now'))
829 ON CONFLICT(item_key) DO UPDATE SET
830 entity_type = excluded.entity_type,
831 op_kind = excluded.op_kind,
832 attempt_count = 0,
833 last_error = NULL,
834 updated_at = datetime('now')",
835 params![item_key, entity_type, op_kind.as_str()],
836 )?;
837 mark_sidecar_dirty(tx)?;
838 Ok(())
839}
840
841#[cfg(feature = "hnsw")]
842pub(crate) use IndexOpKind as PendingIndexOpKind;
843
844#[cfg(feature = "hnsw")]
845pub(crate) fn enqueue_pending_index_op(
846 tx: &rusqlite::Transaction<'_>,
847 item_key: &str,
848 entity_type: &str,
849 op_kind: PendingIndexOpKind,
850) -> Result<(), MemoryError> {
851 queue_pending_index_op(tx, item_key, entity_type, op_kind)
852}
853
854pub(crate) fn list_pending_index_ops(
855 conn: &Connection,
856) -> Result<Vec<PendingIndexOp>, MemoryError> {
857 let table_exists: bool = conn
858 .query_row(
859 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
860 [],
861 |row| row.get(0),
862 )
863 .unwrap_or(false);
864 if !table_exists {
865 return Ok(Vec::new());
866 }
867
868 let mut stmt = conn.prepare(
869 "SELECT item_key, entity_type, op_kind, attempt_count, last_error
870 FROM pending_index_ops
871 ORDER BY updated_at ASC, item_key ASC",
872 )?;
873 let rows = stmt
874 .query_map([], |row| {
875 let item_key: String = row.get(0)?;
876 let op_kind: String = row.get(2)?;
877 Ok(PendingIndexOp {
878 item_key: item_key.clone(),
879 entity_type: row.get(1)?,
880 op_kind: IndexOpKind::parse(&op_kind, &item_key).map_err(|e| {
881 rusqlite::Error::FromSqlConversionFailure(
882 2,
883 rusqlite::types::Type::Text,
884 Box::new(e),
885 )
886 })?,
887 attempt_count: row.get::<_, i64>(3)? as u32,
888 last_error: row.get(4)?,
889 })
890 })?
891 .collect::<Result<Vec<_>, _>>()?;
892 Ok(rows)
893}
894
895#[cfg(feature = "hnsw")]
896pub(crate) fn pending_index_op_count(conn: &Connection) -> Result<usize, MemoryError> {
897 let table_exists: bool = conn
898 .query_row(
899 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
900 [],
901 |row| row.get(0),
902 )
903 .unwrap_or(false);
904 if !table_exists {
905 return Ok(0);
906 }
907
908 let count: i64 = conn.query_row("SELECT COUNT(*) FROM pending_index_ops", [], |row| {
909 row.get(0)
910 })?;
911 Ok(count as usize)
912}
913
914#[cfg(feature = "hnsw")]
915pub(crate) fn mark_pending_index_ops_failed(
916 conn: &Connection,
917 item_keys: &[String],
918 error: &str,
919) -> Result<(), MemoryError> {
920 with_transaction(conn, |tx| {
921 for item_key in item_keys {
922 tx.execute(
923 "UPDATE pending_index_ops
924 SET attempt_count = attempt_count + 1,
925 last_error = ?1,
926 updated_at = datetime('now')
927 WHERE item_key = ?2",
928 params![error, item_key],
929 )?;
930 }
931 Ok(())
932 })
933}
934
935#[cfg(feature = "hnsw")]
936pub(crate) fn clear_pending_index_ops(
937 conn: &Connection,
938 item_keys: &[String],
939) -> Result<(), MemoryError> {
940 with_transaction(conn, |tx| {
941 for item_key in item_keys {
942 tx.execute(
943 "DELETE FROM pending_index_ops WHERE item_key = ?1",
944 params![item_key],
945 )?;
946 }
947 Ok(())
948 })
949}
950
951#[cfg(feature = "hnsw")]
952pub(crate) fn clear_all_pending_index_ops(conn: &Connection) -> Result<(), MemoryError> {
953 conn.execute("DELETE FROM pending_index_ops", [])?;
954 Ok(())
955}
956
957#[cfg(feature = "hnsw")]
958pub(crate) fn load_embedding_for_index_key(
959 conn: &Connection,
960 item_key: &str,
961) -> Result<Option<Vec<f32>>, MemoryError> {
962 let Some((domain, raw_id)) = item_key.split_once(':') else {
963 return Err(MemoryError::InvalidKey(item_key.to_string()));
964 };
965
966 let blob_result: Result<Option<Vec<u8>>, rusqlite::Error> = match domain {
967 "fact" => conn.query_row(
968 "SELECT embedding FROM facts WHERE id = ?1",
969 params![raw_id],
970 |row| row.get(0),
971 ),
972 "chunk" => conn.query_row(
973 "SELECT embedding FROM chunks WHERE id = ?1",
974 params![raw_id],
975 |row| row.get(0),
976 ),
977 "msg" => {
978 let message_id = raw_id
979 .parse::<i64>()
980 .map_err(|e| MemoryError::InvalidKey(format!("{}: {e}", item_key)))?;
981 conn.query_row(
982 "SELECT embedding FROM messages WHERE id = ?1",
983 params![message_id],
984 |row| row.get(0),
985 )
986 }
987 "episode" => conn.query_row(
988 "SELECT embedding FROM episodes WHERE episode_id = ?1",
989 params![raw_id],
990 |row| row.get(0),
991 ),
992 _ => return Err(MemoryError::InvalidKey(item_key.to_string())),
993 };
994
995 let blob = match blob_result {
996 Ok(blob) => blob,
997 Err(rusqlite::Error::QueryReturnedNoRows) => None,
998 Err(err) => return Err(err.into()),
999 };
1000
1001 blob.map(|bytes| bytes_to_embedding(&bytes)).transpose()
1002}
1003
1004#[cfg(feature = "hnsw")]
1005fn mark_sidecar_dirty(tx: &rusqlite::Transaction<'_>) -> Result<(), MemoryError> {
1006 tx.execute(
1007 "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '1')
1008 ON CONFLICT(key) DO UPDATE SET value = '1'",
1009 [],
1010 )?;
1011 Ok(())
1012}
1013
1014#[cfg(feature = "hnsw")]
1015pub(crate) fn is_sidecar_dirty(conn: &Connection) -> Result<bool, MemoryError> {
1016 let dirty: Option<String> = conn
1018 .query_row(
1019 "SELECT value FROM hnsw_metadata WHERE key = 'sidecar_dirty'",
1020 [],
1021 |row| row.get(0),
1022 )
1023 .ok();
1024 Ok(matches!(dirty.as_deref(), Some("1")))
1025}
1026
1027#[cfg(feature = "hnsw")]
1028pub(crate) fn set_sidecar_dirty(conn: &Connection, dirty: bool) -> Result<(), MemoryError> {
1029 conn.execute(
1030 "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', ?1)
1031 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1032 params![if dirty { "1" } else { "0" }],
1033 )?;
1034 Ok(())
1035}
1036
1037pub(crate) fn parse_optional_json(
1038 table: &'static str,
1039 row_id: &str,
1040 field: &'static str,
1041 raw: Option<&str>,
1042) -> Result<Option<serde_json::Value>, MemoryError> {
1043 match raw {
1044 Some(raw) => serde_json::from_str(raw)
1045 .map(Some)
1046 .map_err(|e| MemoryError::CorruptData {
1047 table,
1048 row_id: row_id.to_string(),
1049 detail: format!("invalid {field}: {e}"),
1050 }),
1051 None => Ok(None),
1052 }
1053}
1054
1055pub(crate) fn parse_string_list_json(
1056 table: &'static str,
1057 row_id: &str,
1058 field: &'static str,
1059 raw: &str,
1060) -> Result<Vec<String>, MemoryError> {
1061 serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
1062 table,
1063 row_id: row_id.to_string(),
1064 detail: format!("invalid {field}: {e}"),
1065 })
1066}
1067
1068pub(crate) fn parse_role(
1069 table: &'static str,
1070 row_id: &str,
1071 raw: &str,
1072) -> Result<Role, MemoryError> {
1073 Role::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
1074 table,
1075 row_id: row_id.to_string(),
1076 detail: format!("invalid role '{raw}'"),
1077 })
1078}
1079
1080pub(crate) fn parse_episode_outcome(
1081 row_id: &str,
1082 raw: &str,
1083) -> Result<EpisodeOutcome, MemoryError> {
1084 EpisodeOutcome::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
1085 table: "episodes",
1086 row_id: row_id.to_string(),
1087 detail: format!("invalid outcome '{raw}'"),
1088 })
1089}
1090
1091pub(crate) fn parse_verification_status(
1092 row_id: &str,
1093 raw: &str,
1094) -> Result<VerificationStatus, MemoryError> {
1095 serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
1096 table: "episodes",
1097 row_id: row_id.to_string(),
1098 detail: format!("invalid verification_status: {e}"),
1099 })
1100}
1101
1102pub fn verify_integrity_sync(
1104 conn: &Connection,
1105 mode: VerifyMode,
1106) -> Result<IntegrityReport, MemoryError> {
1107 let mut issues = Vec::new();
1108
1109 let schema_version: u32 = conn
1110 .query_row("PRAGMA user_version", [], |row| row.get(0))
1111 .unwrap_or_else(|e| {
1112 issues.push(format!("failed to read schema version: {e}"));
1113 0
1114 });
1115 if schema_version > MAX_SCHEMA_VERSION {
1116 issues.push(format!(
1117 "schema version {} is ahead of supported {}",
1118 schema_version, MAX_SCHEMA_VERSION
1119 ));
1120 }
1121
1122 let fact_count: usize = conn
1123 .query_row("SELECT COUNT(*) FROM facts", [], |row| row.get(0))
1124 .unwrap_or_else(|e| {
1125 issues.push(format!("failed to count facts: {e}"));
1126 0
1127 });
1128 let chunk_count: usize = conn
1129 .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
1130 .unwrap_or_else(|e| {
1131 issues.push(format!("failed to count chunks: {e}"));
1132 0
1133 });
1134 let message_count: usize = conn
1135 .query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
1136 .unwrap_or_else(|e| {
1137 issues.push(format!("failed to count messages: {e}"));
1138 0
1139 });
1140 let episode_count: usize = conn
1141 .query_row("SELECT COUNT(*) FROM episodes", [], |row| row.get(0))
1142 .unwrap_or_else(|e| {
1143 issues.push(format!("failed to count episodes: {e}"));
1144 0
1145 });
1146
1147 let facts_missing_embeddings: usize = conn
1148 .query_row(
1149 "SELECT COUNT(*) FROM facts WHERE embedding IS NULL",
1150 [],
1151 |row| row.get(0),
1152 )
1153 .unwrap_or_else(|e| {
1154 issues.push(format!("failed to count facts missing embeddings: {e}"));
1155 0
1156 });
1157 let chunks_missing_embeddings: usize = conn
1158 .query_row(
1159 "SELECT COUNT(*) FROM chunks WHERE embedding IS NULL",
1160 [],
1161 |row| row.get(0),
1162 )
1163 .unwrap_or_else(|e| {
1164 issues.push(format!("failed to count chunks missing embeddings: {e}"));
1165 0
1166 });
1167 let episodes_missing_embeddings: usize = conn
1168 .query_row(
1169 "SELECT COUNT(*) FROM episodes WHERE embedding IS NULL",
1170 [],
1171 |row| row.get(0),
1172 )
1173 .unwrap_or_else(|e| {
1174 issues.push(format!("failed to count episodes missing embeddings: {e}"));
1175 0
1176 });
1177
1178 if facts_missing_embeddings > 0 {
1179 issues.push(format!(
1180 "{} facts missing embeddings",
1181 facts_missing_embeddings
1182 ));
1183 }
1184 if chunks_missing_embeddings > 0 {
1185 issues.push(format!(
1186 "{} chunks missing embeddings",
1187 chunks_missing_embeddings
1188 ));
1189 }
1190 if episodes_missing_embeddings > 0 {
1191 issues.push(format!(
1192 "{} episodes missing embeddings",
1193 episodes_missing_embeddings
1194 ));
1195 }
1196
1197 let pending_ops = list_pending_index_ops(conn).unwrap_or_default();
1198 if !pending_ops.is_empty() {
1199 issues.push(format!(
1200 "{} pending HNSW sidecar ops queued in SQLite",
1201 pending_ops.len()
1202 ));
1203 for op in pending_ops.iter().take(5) {
1204 let op_kind = op.op_kind.as_str();
1205 let detail = match &op.last_error {
1206 Some(last_error) => format!(
1207 "{} {} {} (attempts: {}, last_error: {})",
1208 op.entity_type,
1209 op.op_kind.as_str(),
1210 op.item_key,
1211 op.attempt_count,
1212 last_error
1213 ),
1214 None => format!(
1215 "{} {} {} (attempts: {})",
1216 op.entity_type, op_kind, op.item_key, op.attempt_count
1217 ),
1218 };
1219 issues.push(format!("pending sidecar op: {detail}"));
1220 }
1221 }
1222
1223 if mode == VerifyMode::Full {
1224 let dims: usize = conn
1225 .query_row(
1226 "SELECT dimensions FROM embedding_metadata WHERE id = 1",
1227 [],
1228 |row| row.get(0),
1229 )
1230 .unwrap_or_else(|e| {
1231 issues.push(format!("failed to read embedding dimensions: {e}"));
1232 0
1233 });
1234
1235 verify_fts_drift(conn, "facts", "facts_rowid_map", fact_count, &mut issues);
1236 verify_fts_drift(conn, "chunks", "chunks_rowid_map", chunk_count, &mut issues);
1237 verify_fts_drift(
1238 conn,
1239 "messages",
1240 "messages_rowid_map",
1241 message_count,
1242 &mut issues,
1243 );
1244 verify_fts_drift(
1245 conn,
1246 "episodes",
1247 "episodes_rowid_map",
1248 episode_count,
1249 &mut issues,
1250 );
1251
1252 verify_blob_table(conn, "facts", "id", "embedding", dims, &mut issues)?;
1253 verify_blob_table(conn, "chunks", "id", "embedding", dims, &mut issues)?;
1254 verify_blob_table(conn, "messages", "id", "embedding", dims, &mut issues)?;
1255 verify_blob_table(
1256 conn,
1257 "episodes",
1258 "episode_id",
1259 "embedding",
1260 dims,
1261 &mut issues,
1262 )?;
1263
1264 verify_quantized_table(conn, "facts", "id", dims, &mut issues)?;
1265 verify_quantized_table(conn, "chunks", "id", dims, &mut issues)?;
1266 verify_quantized_table(conn, "messages", "id", dims, &mut issues)?;
1267 verify_quantized_table(conn, "episodes", "episode_id", dims, &mut issues)?;
1268
1269 verify_session_rows(conn, &mut issues)?;
1270 verify_message_rows(conn, &mut issues)?;
1271 verify_fact_rows(conn, &mut issues)?;
1272 verify_document_rows(conn, &mut issues)?;
1273 verify_episode_rows(conn, &mut issues)?;
1274
1275 let integrity_check: String = conn
1276 .query_row("PRAGMA integrity_check", [], |row| row.get(0))
1277 .unwrap_or_else(|_| "error".to_string());
1278 if integrity_check != "ok" {
1279 issues.push(format!("SQLite integrity_check: {}", integrity_check));
1280 }
1281 }
1282
1283 Ok(IntegrityReport {
1284 ok: issues.is_empty(),
1285 schema_version,
1286 fact_count,
1287 chunk_count,
1288 message_count,
1289 facts_missing_embeddings,
1290 chunks_missing_embeddings,
1291 issues,
1292 })
1293}
1294
1295pub fn reconcile_fts(conn: &Connection) -> Result<(), MemoryError> {
1297 with_transaction(conn, |tx| {
1298 tx.execute_batch("DROP TABLE IF EXISTS facts_fts")?;
1299 tx.execute_batch("DELETE FROM facts_rowid_map")?;
1300 tx.execute_batch(
1301 "CREATE VIRTUAL TABLE facts_fts USING fts5(
1302 content,
1303 content='',
1304 content_rowid='rowid',
1305 tokenize='porter unicode61'
1306 )",
1307 )?;
1308 tx.execute_batch("INSERT INTO facts_rowid_map (fact_id) SELECT id FROM facts")?;
1309 tx.execute_batch(
1310 "INSERT INTO facts_fts (rowid, content)
1311 SELECT rm.rowid, f.content
1312 FROM facts_rowid_map rm
1313 JOIN facts f ON f.id = rm.fact_id",
1314 )?;
1315
1316 tx.execute_batch("DROP TABLE IF EXISTS chunks_fts")?;
1317 tx.execute_batch("DELETE FROM chunks_rowid_map")?;
1318 tx.execute_batch(
1319 "CREATE VIRTUAL TABLE chunks_fts USING fts5(
1320 content,
1321 content='',
1322 content_rowid='rowid',
1323 tokenize='porter unicode61'
1324 )",
1325 )?;
1326 tx.execute_batch("INSERT INTO chunks_rowid_map (chunk_id) SELECT id FROM chunks")?;
1327 tx.execute_batch(
1328 "INSERT INTO chunks_fts (rowid, content)
1329 SELECT rm.rowid, c.content
1330 FROM chunks_rowid_map rm
1331 JOIN chunks c ON c.id = rm.chunk_id",
1332 )?;
1333
1334 tx.execute_batch("DROP TABLE IF EXISTS messages_fts")?;
1335 tx.execute_batch("DELETE FROM messages_rowid_map")?;
1336 tx.execute_batch(
1337 "CREATE VIRTUAL TABLE messages_fts USING fts5(
1338 content,
1339 content='',
1340 content_rowid='rowid',
1341 tokenize='porter unicode61'
1342 )",
1343 )?;
1344 tx.execute_batch("INSERT INTO messages_rowid_map (message_id) SELECT id FROM messages")?;
1345 tx.execute_batch(
1346 "INSERT INTO messages_fts (rowid, content)
1347 SELECT rm.rowid, m.content
1348 FROM messages_rowid_map rm
1349 JOIN messages m ON m.id = rm.message_id",
1350 )?;
1351
1352 tx.execute_batch("DROP TABLE IF EXISTS episodes_fts")?;
1353 tx.execute_batch("DELETE FROM episodes_rowid_map")?;
1354 tx.execute_batch(
1355 "CREATE VIRTUAL TABLE episodes_fts USING fts5(
1356 content,
1357 content='',
1358 content_rowid='rowid',
1359 tokenize='porter unicode61'
1360 )",
1361 )?;
1362 tx.execute_batch(
1363 "INSERT INTO episodes_rowid_map (episode_id, document_id) SELECT episode_id, document_id FROM episodes",
1364 )?;
1365 tx.execute_batch(
1366 "INSERT INTO episodes_fts (rowid, content)
1367 SELECT rm.rowid, e.search_text
1368 FROM episodes_rowid_map rm
1369 JOIN episodes e ON e.episode_id = rm.episode_id",
1370 )?;
1371
1372 Ok(())
1373 })?;
1374
1375 tracing::info!("FTS indexes reconciled");
1376 Ok(())
1377}
1378
1379fn verify_fts_drift(
1380 conn: &Connection,
1381 label: &str,
1382 map_table: &str,
1383 source_count: usize,
1384 issues: &mut Vec<String>,
1385) {
1386 let table_exists: bool = conn
1387 .query_row(
1388 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name = ?1",
1389 params![map_table],
1390 |row| row.get(0),
1391 )
1392 .unwrap_or(false);
1393 if !table_exists {
1394 if source_count > 0 {
1395 issues.push(format!("{} rows exist but {} is missing", label, map_table));
1396 }
1397 return;
1398 }
1399
1400 let sql = format!("SELECT COUNT(*) FROM {}", map_table);
1401 let indexed_count: usize = conn.query_row(&sql, [], |row| row.get(0)).unwrap_or(0);
1402 if indexed_count != source_count {
1403 issues.push(format!(
1404 "FTS {} index drift: {} rows in map vs {} source rows",
1405 label, indexed_count, source_count
1406 ));
1407 }
1408}
1409
1410fn verify_blob_table(
1411 conn: &Connection,
1412 table: &'static str,
1413 id_column: &'static str,
1414 blob_column: &'static str,
1415 expected_dims: usize,
1416 issues: &mut Vec<String>,
1417) -> Result<(), MemoryError> {
1418 if expected_dims == 0 {
1419 return Ok(());
1420 }
1421
1422 let sql = format!(
1423 "SELECT CAST({id_column} AS TEXT), {blob_column} FROM {table} WHERE {blob_column} IS NOT NULL"
1424 );
1425 let mut stmt = conn.prepare(&sql)?;
1426 let rows = stmt.query_map([], |row| {
1427 Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1428 })?;
1429
1430 for row in rows {
1431 let (row_id, blob) = row?;
1432 match bytes_to_embedding(&blob) {
1433 Ok(embedding) if embedding.len() != expected_dims => issues.push(format!(
1434 "{}({}) has embedding dimension {} but expected {}",
1435 table,
1436 row_id,
1437 embedding.len(),
1438 expected_dims
1439 )),
1440 Ok(_) => {}
1441 Err(err) => issues.push(format!(
1442 "{}({}) invalid embedding blob: {}",
1443 table, row_id, err
1444 )),
1445 }
1446 }
1447
1448 Ok(())
1449}
1450
1451fn verify_quantized_table(
1452 conn: &Connection,
1453 table: &'static str,
1454 id_column: &'static str,
1455 expected_dims: usize,
1456 issues: &mut Vec<String>,
1457) -> Result<(), MemoryError> {
1458 if expected_dims == 0 {
1459 return Ok(());
1460 }
1461
1462 let sql = format!(
1463 "SELECT CAST({id_column} AS TEXT), embedding_q8 FROM {table} WHERE embedding IS NOT NULL"
1464 );
1465 let mut stmt = conn.prepare(&sql)?;
1466 let rows = stmt.query_map([], |row| {
1467 Ok((row.get::<_, String>(0)?, row.get::<_, Option<Vec<u8>>>(1)?))
1468 })?;
1469
1470 for row in rows {
1471 let (row_id, blob) = row?;
1472 match blob {
1473 Some(blob) => {
1474 if let Err(err) = unpack_quantized(&blob, expected_dims) {
1475 issues.push(format!(
1476 "{}({}) invalid quantized embedding: {}",
1477 table, row_id, err
1478 ));
1479 }
1480 }
1481 None => issues.push(format!("{}({}) missing quantized embedding", table, row_id)),
1482 }
1483 }
1484
1485 Ok(())
1486}
1487
1488fn verify_session_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1489 let mut stmt = conn.prepare("SELECT id, metadata FROM sessions WHERE metadata IS NOT NULL")?;
1490 let rows = stmt.query_map([], |row| {
1491 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1492 })?;
1493 for row in rows {
1494 let (id, metadata) = row?;
1495 if let Err(err) = parse_optional_json("sessions", &id, "metadata", Some(&metadata)) {
1496 issues.push(err.to_string());
1497 }
1498 }
1499 Ok(())
1500}
1501
1502fn verify_message_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1503 let mut stmt = conn.prepare("SELECT id, role, metadata FROM messages")?;
1504 let rows = stmt.query_map([], |row| {
1505 Ok((
1506 row.get::<_, i64>(0)?,
1507 row.get::<_, String>(1)?,
1508 row.get::<_, Option<String>>(2)?,
1509 ))
1510 })?;
1511 for row in rows {
1512 let (id, role, metadata) = row?;
1513 let row_id = id.to_string();
1514 if let Err(err) = parse_role("messages", &row_id, &role) {
1515 issues.push(err.to_string());
1516 }
1517 if let Err(err) = parse_optional_json("messages", &row_id, "metadata", metadata.as_deref())
1518 {
1519 issues.push(err.to_string());
1520 }
1521 }
1522 Ok(())
1523}
1524
1525fn verify_fact_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1526 let mut stmt = conn.prepare("SELECT id, metadata FROM facts WHERE metadata IS NOT NULL")?;
1527 let rows = stmt.query_map([], |row| {
1528 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1529 })?;
1530 for row in rows {
1531 let (id, metadata) = row?;
1532 if let Err(err) = parse_optional_json("facts", &id, "metadata", Some(&metadata)) {
1533 issues.push(err.to_string());
1534 }
1535 }
1536 Ok(())
1537}
1538
1539fn verify_document_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1540 let mut stmt = conn.prepare("SELECT id, metadata FROM documents WHERE metadata IS NOT NULL")?;
1541 let rows = stmt.query_map([], |row| {
1542 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1543 })?;
1544 for row in rows {
1545 let (id, metadata) = row?;
1546 if let Err(err) = parse_optional_json("documents", &id, "metadata", Some(&metadata)) {
1547 issues.push(err.to_string());
1548 }
1549 }
1550 Ok(())
1551}
1552
1553fn verify_episode_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
1554 let mut stmt = conn.prepare(
1555 "SELECT episode_id, cause_ids, outcome, verification_status
1556 FROM episodes",
1557 )?;
1558 let rows = stmt.query_map([], |row| {
1559 Ok((
1560 row.get::<_, String>(0)?,
1561 row.get::<_, String>(1)?,
1562 row.get::<_, String>(2)?,
1563 row.get::<_, String>(3)?,
1564 ))
1565 })?;
1566 for row in rows {
1567 let (episode_id, cause_ids, outcome, verification_status) = row?;
1568 if let Err(err) = parse_string_list_json("episodes", &episode_id, "cause_ids", &cause_ids) {
1569 issues.push(err.to_string());
1570 }
1571 if let Err(err) = parse_episode_outcome(&episode_id, &outcome) {
1572 issues.push(err.to_string());
1573 }
1574 if let Err(err) = parse_verification_status(&episode_id, &verification_status) {
1575 issues.push(err.to_string());
1576 }
1577 }
1578 Ok(())
1579}