1use crate::config::{EmbeddingConfig, MemoryLimits, PoolConfig};
4use crate::error::MemoryError;
5use crate::quantize::unpack_quantized;
6#[cfg(feature = "turbo-quant-codec")]
7use crate::types::{DerivedVectorArtifactGenerationV1, VectorArtifactBuildReceiptV1};
8use crate::types::{EpisodeOutcome, Role, VectorSearchReceiptV1, VerificationStatus};
9use chrono::{DateTime, Utc};
10use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use stack_ids::ContentDigest;
13#[cfg(feature = "turbo-quant-codec")]
14use stack_ids::DigestBuilder;
15use std::path::Path;
16
17const MIGRATION_V1: &str = r#"
19-- CONVERSATIONS
20CREATE TABLE sessions (
21 id TEXT PRIMARY KEY,
22 channel TEXT NOT NULL DEFAULT 'repl',
23 created_at TEXT NOT NULL DEFAULT (datetime('now')),
24 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
25 metadata TEXT
26);
27
28CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC);
29
30CREATE TABLE messages (
31 id INTEGER PRIMARY KEY AUTOINCREMENT,
32 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
33 role TEXT NOT NULL CHECK (role IN ('system', 'user', 'assistant', 'tool')),
34 content TEXT NOT NULL,
35 token_count INTEGER,
36 created_at TEXT NOT NULL DEFAULT (datetime('now')),
37 metadata TEXT
38);
39
40CREATE INDEX idx_messages_session ON messages(session_id, created_at ASC);
41CREATE INDEX idx_messages_created ON messages(created_at DESC);
42
43-- KNOWLEDGE (Facts)
44CREATE TABLE facts (
45 id TEXT PRIMARY KEY,
46 namespace TEXT NOT NULL DEFAULT 'general',
47 content TEXT NOT NULL,
48 source TEXT,
49 embedding BLOB,
50 created_at TEXT NOT NULL DEFAULT (datetime('now')),
51 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
52 metadata TEXT
53);
54
55CREATE INDEX idx_facts_namespace ON facts(namespace);
56CREATE INDEX idx_facts_updated ON facts(updated_at DESC);
57
58CREATE TABLE facts_rowid_map (
59 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
60 fact_id TEXT NOT NULL UNIQUE REFERENCES facts(id) ON DELETE CASCADE
61);
62
63CREATE VIRTUAL TABLE facts_fts USING fts5(
64 content,
65 content='',
66 content_rowid='rowid',
67 tokenize='porter unicode61'
68);
69
70-- DOCUMENTS (Chunked content)
71CREATE TABLE documents (
72 id TEXT PRIMARY KEY,
73 title TEXT NOT NULL,
74 source_path TEXT,
75 namespace TEXT NOT NULL DEFAULT 'general',
76 created_at TEXT NOT NULL DEFAULT (datetime('now')),
77 metadata TEXT
78);
79
80CREATE TABLE chunks (
81 id TEXT PRIMARY KEY,
82 document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
83 chunk_index INTEGER NOT NULL,
84 content TEXT NOT NULL,
85 token_count INTEGER,
86 embedding BLOB,
87 created_at TEXT NOT NULL DEFAULT (datetime('now'))
88);
89
90CREATE INDEX idx_chunks_document ON chunks(document_id, chunk_index ASC);
91
92CREATE TABLE chunks_rowid_map (
93 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
94 chunk_id TEXT NOT NULL UNIQUE REFERENCES chunks(id) ON DELETE CASCADE
95);
96
97CREATE VIRTUAL TABLE chunks_fts USING fts5(
98 content,
99 content='',
100 content_rowid='rowid',
101 tokenize='porter unicode61'
102);
103
104-- EMBEDDING METADATA
105CREATE TABLE embedding_metadata (
106 id INTEGER PRIMARY KEY CHECK (id = 1),
107 model_name TEXT NOT NULL,
108 dimensions INTEGER NOT NULL,
109 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
110);
111"#;
112
113const MIGRATION_V2: &str = r#"
115ALTER TABLE messages ADD COLUMN embedding BLOB;
116
117CREATE TABLE messages_rowid_map (
118 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
119 message_id INTEGER NOT NULL UNIQUE REFERENCES messages(id) ON DELETE CASCADE
120);
121
122CREATE VIRTUAL TABLE messages_fts USING fts5(
123 content,
124 content='',
125 content_rowid='rowid',
126 tokenize='porter unicode61'
127);
128"#;
129
130const MIGRATION_V3: &str = r#"
132ALTER TABLE embedding_metadata ADD COLUMN embeddings_dirty INTEGER NOT NULL DEFAULT 0;
133"#;
134
135const MIGRATION_V4: &str = r#"
137CREATE TABLE IF NOT EXISTS hnsw_metadata (
138 key TEXT PRIMARY KEY,
139 value TEXT NOT NULL
140);
141"#;
142
143const MIGRATION_V5: &str = r#"
145ALTER TABLE facts ADD COLUMN embedding_q8 BLOB;
146ALTER TABLE chunks ADD COLUMN embedding_q8 BLOB;
147ALTER TABLE messages ADD COLUMN embedding_q8 BLOB;
148
149CREATE TABLE IF NOT EXISTS hnsw_keymap (
150 node_id INTEGER PRIMARY KEY,
151 item_key TEXT NOT NULL UNIQUE,
152 deleted INTEGER NOT NULL DEFAULT 0
153);
154
155CREATE INDEX idx_hnsw_keymap_key ON hnsw_keymap(item_key);
156"#;
157
158const MIGRATION_V6: &str = r#"
160CREATE TABLE IF NOT EXISTS episodes (
161 document_id TEXT PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
162 cause_ids TEXT NOT NULL,
163 effect_type TEXT NOT NULL,
164 outcome TEXT NOT NULL DEFAULT 'pending',
165 confidence REAL NOT NULL DEFAULT 0.0,
166 verification_status TEXT NOT NULL DEFAULT '{"status":"unverified"}',
167 experiment_id TEXT,
168 created_at TEXT NOT NULL DEFAULT (datetime('now'))
169);
170
171CREATE INDEX IF NOT EXISTS idx_episodes_effect_type ON episodes(effect_type);
172CREATE INDEX IF NOT EXISTS idx_episodes_outcome ON episodes(outcome);
173CREATE INDEX IF NOT EXISTS idx_episodes_experiment_id ON episodes(experiment_id);
174"#;
175
176const MIGRATION_V7: &str = r#"
178ALTER TABLE episodes ADD COLUMN updated_at TEXT NOT NULL DEFAULT (datetime('now'));
179ALTER TABLE episodes ADD COLUMN search_text TEXT NOT NULL DEFAULT '';
180ALTER TABLE episodes ADD COLUMN embedding BLOB;
181ALTER TABLE episodes ADD COLUMN embedding_q8 BLOB;
182
183CREATE TABLE IF NOT EXISTS episodes_rowid_map (
184 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
185 document_id TEXT NOT NULL UNIQUE REFERENCES episodes(document_id) ON DELETE CASCADE
186);
187
188CREATE VIRTUAL TABLE episodes_fts USING fts5(
189 content,
190 content='',
191 content_rowid='rowid',
192 tokenize='porter unicode61'
193);
194
195CREATE TABLE IF NOT EXISTS pending_index_ops (
196 item_key TEXT PRIMARY KEY,
197 entity_type TEXT NOT NULL,
198 op_kind TEXT NOT NULL CHECK (op_kind IN ('upsert', 'delete')),
199 attempt_count INTEGER NOT NULL DEFAULT 0,
200 last_error TEXT,
201 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
202);
203
204INSERT OR IGNORE INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '0');
205
206UPDATE episodes
207SET search_text = TRIM(
208 COALESCE(effect_type, '') || ' ' ||
209 COALESCE(outcome, '') || ' ' ||
210 COALESCE(experiment_id, '') || ' ' ||
211 COALESCE(cause_ids, '')
212)
213WHERE search_text = '';
214
215INSERT OR IGNORE INTO episodes_rowid_map (document_id)
216SELECT document_id FROM episodes;
217
218INSERT INTO episodes_fts (rowid, content)
219SELECT rm.rowid, e.search_text
220FROM episodes_rowid_map rm
221JOIN episodes e ON e.document_id = rm.document_id;
222"#;
223
224const MIGRATION_V8: &str = r#"
226ALTER TABLE episodes ADD COLUMN trace_id TEXT;
227"#;
228
229const MIGRATION_V9: &str = "";
237
238const MIGRATION_V18: &str = r#"
240CREATE TABLE IF NOT EXISTS search_receipts (
241 receipt_id TEXT PRIMARY KEY,
242 schema_version TEXT NOT NULL,
243 evaluation_time TEXT NOT NULL,
244 search_profile TEXT NOT NULL,
245 candidate_backend TEXT NOT NULL,
246 approximate INTEGER NOT NULL CHECK (approximate IN (0, 1)),
247 exact_rerank INTEGER NOT NULL CHECK (exact_rerank IN (0, 1)),
248 fallback TEXT,
249 requested_candidates INTEGER NOT NULL CHECK (requested_candidates >= 0),
250 returned_candidates INTEGER NOT NULL CHECK (returned_candidates >= 0),
251 post_filter_candidates INTEGER NOT NULL CHECK (post_filter_candidates >= 0),
252 result_ids_json TEXT NOT NULL,
253 receipt_json TEXT NOT NULL,
254 receipt_digest TEXT NOT NULL,
255 created_at TEXT NOT NULL DEFAULT (datetime('now'))
256);
257
258CREATE INDEX IF NOT EXISTS idx_search_receipts_created
259ON search_receipts(created_at DESC);
260
261CREATE INDEX IF NOT EXISTS idx_search_receipts_backend
262ON search_receipts(candidate_backend);
263"#;
264
265const MIGRATION_V19: &str = r#"
267CREATE TABLE IF NOT EXISTS derived_vector_artifacts (
268 item_key TEXT NOT NULL,
269 codec_family TEXT NOT NULL,
270 codec_profile_digest TEXT NOT NULL,
271 source_embedding_digest TEXT NOT NULL,
272 encoded_digest TEXT NOT NULL,
273 artifact_digest TEXT NOT NULL,
274 encoding TEXT NOT NULL,
275 dim INTEGER NOT NULL,
276 encoded BLOB NOT NULL,
277 created_at TEXT NOT NULL DEFAULT (datetime('now')),
278 status TEXT NOT NULL DEFAULT 'active',
279 PRIMARY KEY (item_key, codec_family, codec_profile_digest)
280);
281
282CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_profile
283ON derived_vector_artifacts(codec_family, codec_profile_digest, status);
284
285CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_source_digest
286ON derived_vector_artifacts(source_embedding_digest);
287"#;
288
289const MIGRATION_V20: &str = r#"
291-- Procedural migration; see run_migration_v20.
292"#;
293
294const MIGRATION_V21: &str = r#"
296CREATE TABLE IF NOT EXISTS derived_vector_artifact_generations (
297 generation_id TEXT PRIMARY KEY,
298 schema_version TEXT NOT NULL,
299 codec_family TEXT NOT NULL,
300 codec_profile_digest TEXT NOT NULL,
301 source_snapshot_digest TEXT NOT NULL,
302 source_row_count INTEGER NOT NULL,
303 artifact_count INTEGER NOT NULL,
304 source_tables_json TEXT NOT NULL,
305 dim INTEGER NOT NULL,
306 encoding TEXT NOT NULL,
307 created_at TEXT NOT NULL,
308 build_receipt_id TEXT,
309 artifact_manifest_digest TEXT NOT NULL,
310 status TEXT NOT NULL CHECK (status IN ('active', 'superseded', 'invalidated', 'failed')),
311 degradations_json TEXT NOT NULL DEFAULT '[]'
312);
313
314CREATE INDEX IF NOT EXISTS idx_derived_vector_generations_profile
315ON derived_vector_artifact_generations(codec_family, codec_profile_digest, status, created_at DESC);
316"#;
317
318const MIGRATION_V23: &str = r#"
321ALTER TABLE derived_vector_artifacts ADD COLUMN codec_governance_receipt_id TEXT;
322ALTER TABLE derived_vector_artifacts ADD COLUMN codec_profile TEXT;
323ALTER TABLE derived_vector_artifacts ADD COLUMN degradation_budget REAL;
324ALTER TABLE derived_vector_artifacts ADD COLUMN raw_source_artifact_id TEXT;
325"#;
326
327const MIGRATION_V22: &str = r#"
330ALTER TABLE episodes ADD COLUMN valid_time TEXT;
331ALTER TABLE episodes ADD COLUMN recorded_time TEXT NOT NULL DEFAULT (datetime('now'));
332ALTER TABLE episodes ADD COLUMN superseded_by TEXT;
333ALTER TABLE episodes ADD COLUMN fact_digest TEXT;
334CREATE INDEX IF NOT EXISTS idx_episodes_recorded ON episodes(recorded_time ASC);
335CREATE INDEX IF NOT EXISTS idx_episodes_valid ON episodes(valid_time);
336CREATE INDEX IF NOT EXISTS idx_episodes_superseded ON episodes(superseded_by) WHERE superseded_by IS NOT NULL;
337UPDATE episodes SET recorded_time = updated_at WHERE recorded_time IS NULL OR recorded_time = '';
338"#;
339
340#[allow(deprecated)]
342const MIGRATIONS: &[(u32, &str)] = &[
343 (1, MIGRATION_V1),
344 (2, MIGRATION_V2),
345 (3, MIGRATION_V3),
346 (4, MIGRATION_V4),
347 (5, MIGRATION_V5),
348 (6, MIGRATION_V6),
349 (7, MIGRATION_V7),
350 (8, MIGRATION_V8),
351 (9, MIGRATION_V9),
352 (10, crate::projection_import::MIGRATION_V10),
353 (11, crate::projection_storage::MIGRATION_V11),
354 (12, crate::projection_storage::MIGRATION_V12),
355 (13, crate::projection_storage::MIGRATION_V13),
356 (14, crate::projection_storage::MIGRATION_V14),
357 (15, crate::projection_storage::MIGRATION_V15),
358 (16, crate::projection_storage::MIGRATION_V16),
359 (17, crate::projection_storage::MIGRATION_V17),
360 (18, MIGRATION_V18),
361 (19, MIGRATION_V19),
362 (20, MIGRATION_V20),
363 (21, MIGRATION_V21),
364 (22, MIGRATION_V22),
365 (23, MIGRATION_V23),
366];
367
368pub const MAX_SCHEMA_VERSION: u32 = 23;
370
371fn run_migration_v9(conn: &Connection) -> Result<(), MemoryError> {
373 let episodes_exist: bool = conn
375 .query_row(
376 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='episodes'",
377 [],
378 |row| row.get(0),
379 )
380 .map_err(|e| MemoryError::MigrationFailed {
381 version: 9,
382 reason: format!("existence check failed: {e}"),
383 })?;
384
385 if !episodes_exist {
386 conn.execute_batch(
388 "CREATE TABLE IF NOT EXISTS episode_causes (
389 episode_id TEXT NOT NULL,
390 cause_node_id TEXT NOT NULL,
391 ordinal INTEGER NOT NULL DEFAULT 0,
392 PRIMARY KEY (episode_id, cause_node_id)
393 );
394 CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
395 )?;
396 return Ok(());
397 }
398
399 conn.execute_batch("PRAGMA foreign_keys = OFF;")?;
401
402 conn.execute_batch(
403 "CREATE TABLE episodes_new (
404 episode_id TEXT PRIMARY KEY,
405 document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
406 cause_ids TEXT NOT NULL,
407 effect_type TEXT NOT NULL,
408 outcome TEXT NOT NULL DEFAULT 'pending',
409 confidence REAL NOT NULL DEFAULT 0.0,
410 verification_status TEXT NOT NULL DEFAULT '{\"status\":\"unverified\"}',
411 experiment_id TEXT,
412 created_at TEXT NOT NULL DEFAULT (datetime('now')),
413 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
414 search_text TEXT NOT NULL DEFAULT '',
415 embedding BLOB,
416 embedding_q8 BLOB,
417 trace_id TEXT
418 )",
419 )?;
420
421 conn.execute_batch(
423 "INSERT INTO episodes_new
424 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
425 verification_status, experiment_id, created_at, updated_at,
426 search_text, embedding, embedding_q8, trace_id)
427 SELECT
428 document_id || '-ep0',
429 document_id, cause_ids, effect_type, outcome, confidence,
430 verification_status, experiment_id, created_at, updated_at,
431 search_text, embedding, embedding_q8, trace_id
432 FROM episodes",
433 )?;
434
435 conn.execute_batch("DROP TABLE episodes")?;
436 conn.execute_batch("ALTER TABLE episodes_new RENAME TO episodes")?;
437
438 conn.execute_batch(
439 "CREATE INDEX idx_episodes_document_id ON episodes(document_id);
440 CREATE INDEX idx_episodes_effect_type ON episodes(effect_type);
441 CREATE INDEX idx_episodes_outcome ON episodes(outcome);
442 CREATE INDEX idx_episodes_experiment_id ON episodes(experiment_id);",
443 )?;
444
445 conn.execute_batch(
447 "DROP TABLE IF EXISTS episodes_rowid_map;
448 CREATE TABLE episodes_rowid_map (
449 rowid INTEGER PRIMARY KEY AUTOINCREMENT,
450 episode_id TEXT NOT NULL UNIQUE,
451 document_id TEXT
452 );
453 INSERT INTO episodes_rowid_map (episode_id, document_id)
454 SELECT episode_id, document_id FROM episodes;",
455 )?;
456
457 conn.execute_batch(
459 "DROP TABLE IF EXISTS episodes_fts;
460 CREATE VIRTUAL TABLE episodes_fts USING fts5(
461 content,
462 content='',
463 content_rowid='rowid',
464 tokenize='porter unicode61'
465 );
466 INSERT INTO episodes_fts (rowid, content)
467 SELECT rm.rowid, e.search_text
468 FROM episodes_rowid_map rm
469 JOIN episodes e ON e.episode_id = rm.episode_id;",
470 )?;
471
472 conn.execute_batch(
474 "CREATE TABLE IF NOT EXISTS episode_causes (
475 episode_id TEXT NOT NULL,
476 cause_node_id TEXT NOT NULL,
477 ordinal INTEGER NOT NULL DEFAULT 0,
478 PRIMARY KEY (episode_id, cause_node_id)
479 );
480 CREATE INDEX IF NOT EXISTS idx_episode_causes_cause ON episode_causes(cause_node_id);",
481 )?;
482
483 conn.execute_batch(
485 "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
486 SELECT e.episode_id, je.value, CAST(je.key AS INTEGER)
487 FROM episodes e, json_each(e.cause_ids) je;",
488 )?;
489
490 conn.execute_batch("PRAGMA foreign_keys = ON;")?;
491
492 Ok(())
493}
494
495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497pub enum VerifyMode {
498 Quick,
500 Full,
502}
503
504#[derive(Debug, Clone)]
506pub struct IntegrityReport {
507 pub ok: bool,
508 pub schema_version: u32,
509 pub fact_count: usize,
510 pub chunk_count: usize,
511 pub message_count: usize,
512 pub facts_missing_embeddings: usize,
513 pub chunks_missing_embeddings: usize,
514 pub issues: Vec<String>,
515}
516
517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
519pub enum ReconcileAction {
520 ReportOnly,
521 RebuildFts,
522 ReEmbed,
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
527pub(crate) enum IndexOpKind {
528 Upsert,
529 Delete,
530}
531
532impl IndexOpKind {
533 pub(crate) fn as_str(self) -> &'static str {
534 match self {
535 Self::Upsert => "upsert",
536 Self::Delete => "delete",
537 }
538 }
539
540 fn parse(raw: &str, item_key: &str) -> Result<Self, MemoryError> {
541 match raw {
542 "upsert" => Ok(Self::Upsert),
543 "delete" => Ok(Self::Delete),
544 other => Err(MemoryError::CorruptData {
545 table: "pending_index_ops",
546 row_id: item_key.to_string(),
547 detail: format!("invalid op_kind '{other}'"),
548 }),
549 }
550 }
551}
552
553#[derive(Debug, Clone)]
555pub(crate) struct PendingIndexOp {
556 pub item_key: String,
557 pub entity_type: String,
558 pub op_kind: IndexOpKind,
559 pub attempt_count: u32,
560 pub last_error: Option<String>,
561}
562
563pub fn with_transaction<F, T>(conn: &Connection, f: F) -> Result<T, MemoryError>
565where
566 F: FnOnce(&rusqlite::Transaction<'_>) -> Result<T, MemoryError>,
567{
568 let tx = conn.unchecked_transaction()?;
569 let result = f(&tx)?;
570 tx.commit()?;
571 Ok(result)
572}
573
574pub fn open_database(
576 path: &Path,
577 pool: &PoolConfig,
578 limits: &MemoryLimits,
579) -> Result<Connection, MemoryError> {
580 open_database_internal(path, pool, limits.max_db_size_bytes, true)
581}
582
583pub fn open_database_connection(
585 path: &Path,
586 pool: &PoolConfig,
587 limits: &MemoryLimits,
588) -> Result<Connection, MemoryError> {
589 open_database_internal(path, pool, limits.max_db_size_bytes, false)
590}
591
592pub(crate) fn open_database_internal(
593 path: &Path,
594 pool: &PoolConfig,
595 max_db_size_bytes: u64,
596 run_schema_migrations: bool,
597) -> Result<Connection, MemoryError> {
598 create_parent_dirs(path)?;
599 let conn = Connection::open(path)?;
600 configure_connection(&conn, path, pool, max_db_size_bytes, false)?;
601 if run_schema_migrations {
602 run_migrations(&conn)?;
603 }
604 Ok(conn)
605}
606
607pub(crate) fn open_pool_member_connection(
608 path: &Path,
609 pool: &PoolConfig,
610 limits: &MemoryLimits,
611 query_only: bool,
612) -> Result<Connection, MemoryError> {
613 create_parent_dirs(path)?;
614 let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE;
615 let conn = Connection::open_with_flags(path, flags)?;
616 configure_connection(&conn, path, pool, limits.max_db_size_bytes, query_only)?;
617 Ok(conn)
618}
619
620fn create_parent_dirs(path: &Path) -> Result<(), MemoryError> {
621 if let Some(parent) = path.parent() {
622 if !parent.as_os_str().is_empty() {
623 std::fs::create_dir_all(parent).map_err(|e| {
624 MemoryError::StorageError(format!(
625 "failed to create database directory {}: {}",
626 parent.display(),
627 e
628 ))
629 })?;
630 }
631 }
632 Ok(())
633}
634
635fn configure_connection(
636 conn: &Connection,
637 path: &Path,
638 pool: &PoolConfig,
639 max_db_size_bytes: u64,
640 query_only: bool,
641) -> Result<(), MemoryError> {
642 let journal_mode = if pool.enable_wal { "WAL" } else { "DELETE" };
643 conn.execute_batch(&format!(
644 "PRAGMA journal_mode = {};
645 PRAGMA foreign_keys = ON;
646 PRAGMA busy_timeout = {};
647 PRAGMA synchronous = NORMAL;
648 PRAGMA temp_store = MEMORY;
649 PRAGMA wal_autocheckpoint = {};",
650 journal_mode, pool.busy_timeout_ms, pool.wal_autocheckpoint,
651 ))?;
652
653 if query_only {
654 conn.execute_batch("PRAGMA query_only = ON;")?;
655 }
656
657 let actual_journal_mode: String =
658 conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
659 let expected_journal_mode = if pool.enable_wal { "wal" } else { "delete" };
660 if actual_journal_mode.to_lowercase() != expected_journal_mode {
661 return Err(MemoryError::StorageError(format!(
662 "SQLite journal mode mismatch for {}: requested {}, got {}",
663 path.display(),
664 expected_journal_mode,
665 actual_journal_mode
666 )));
667 }
668
669 if max_db_size_bytes > 0 {
670 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
671 let max_page_count = max_db_size_bytes.div_ceil(page_size);
672
673 const MAX_SQLITE_PAGE_COUNT: u64 = 1_073_741_823; const MIN_SQLITE_PAGE_COUNT: u64 = 1;
676 if !(MIN_SQLITE_PAGE_COUNT..=MAX_SQLITE_PAGE_COUNT).contains(&max_page_count) {
677 return Err(MemoryError::StorageError(format!(
678 "Invalid max_page_count {}: must be between {} and {}",
679 max_page_count, MIN_SQLITE_PAGE_COUNT, MAX_SQLITE_PAGE_COUNT
680 )));
681 }
682
683 let actual_max_page_count: u64 = conn.query_row(
684 &format!("PRAGMA max_page_count = {}", max_page_count),
685 [],
686 |row| row.get(0),
687 )?;
688 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
689
690 if page_count > actual_max_page_count {
691 return Err(MemoryError::DatabaseSizeLimitExceeded {
692 current: page_count.saturating_mul(page_size),
693 limit: max_db_size_bytes,
694 });
695 }
696 }
697
698 let foreign_keys_enabled: bool = conn.query_row("PRAGMA foreign_keys", [], |row| row.get(0))?;
700 if !foreign_keys_enabled {
701 return Err(MemoryError::StorageError(
702 "PRAGMA foreign_keys failed to enable after configuration".to_string(),
703 ));
704 }
705
706 Ok(())
707}
708
709pub fn run_migrations(conn: &Connection) -> Result<(), MemoryError> {
711 let user_version: u32 = conn
712 .query_row("PRAGMA user_version", [], |row| row.get(0))
713 .map_err(|e| MemoryError::MigrationFailed {
714 version: 0,
715 reason: format!("failed to read PRAGMA user_version: {e}"),
716 })?;
717
718 if user_version > MAX_SCHEMA_VERSION {
719 return Err(MemoryError::SchemaAhead {
720 found: user_version,
721 supported: MAX_SCHEMA_VERSION,
722 });
723 }
724
725 conn.execute_batch(
726 "CREATE TABLE IF NOT EXISTS _schema_version (
727 version INTEGER PRIMARY KEY,
728 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
729 );",
730 )?;
731
732 for &(version, sql) in MIGRATIONS {
733 let current_version: u32 = conn
734 .query_row(
735 "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
736 [],
737 |row| row.get(0),
738 )
739 .unwrap_or(0);
740
741 if current_version >= version {
742 continue;
743 }
744
745 with_transaction(conn, |tx| {
746 match version {
747 9 => run_migration_v9(tx).map_err(|e| MemoryError::MigrationFailed {
748 version,
749 reason: e.to_string(),
750 })?,
751 16 => run_migration_v16(tx).map_err(|e| MemoryError::MigrationFailed {
752 version,
753 reason: e.to_string(),
754 })?,
755 17 => run_migration_v17(tx).map_err(|e| MemoryError::MigrationFailed {
756 version,
757 reason: e.to_string(),
758 })?,
759 20 => run_migration_v20(tx).map_err(|e| MemoryError::MigrationFailed {
760 version,
761 reason: e.to_string(),
762 })?,
763 21 => run_migration_v21(tx).map_err(|e| MemoryError::MigrationFailed {
764 version,
765 reason: e.to_string(),
766 })?,
767 _ => tx
768 .execute_batch(sql)
769 .map_err(|e| MemoryError::MigrationFailed {
770 version,
771 reason: e.to_string(),
772 })?,
773 }
774 tx.execute(
775 "INSERT INTO _schema_version (version) VALUES (?1)",
776 params![version],
777 )
778 .map_err(|e| MemoryError::MigrationFailed {
779 version,
780 reason: e.to_string(),
781 })?;
782 Ok(())
783 })?;
784
785 tracing::info!("Applied migration V{}", version);
786 }
787
788 let final_version: u32 = conn
789 .query_row(
790 "SELECT COALESCE(MAX(version), 0) FROM _schema_version",
791 [],
792 |row| row.get(0),
793 )
794 .unwrap_or(0);
795 conn.execute_batch(&format!("PRAGMA user_version = {};", final_version))?;
796
797 Ok(())
798}
799
800fn run_migration_v16(conn: &Connection) -> Result<(), rusqlite::Error> {
801 add_column_if_missing(conn, "projection_import_log", "kernel_payload_json", "TEXT")?;
802 add_column_if_missing(
803 conn,
804 "projection_import_failures",
805 "kernel_payload_json",
806 "TEXT",
807 )?;
808 Ok(())
809}
810
811fn run_migration_v17(conn: &Connection) -> Result<(), rusqlite::Error> {
812 add_column_if_missing(conn, "projection_import_log", "episode_bundle_id", "TEXT")?;
813 add_column_if_missing(conn, "projection_import_log", "episode_bundle_json", "TEXT")?;
814 add_column_if_missing(
815 conn,
816 "projection_import_log",
817 "execution_context_json",
818 "TEXT",
819 )?;
820 add_column_if_missing(
821 conn,
822 "projection_import_failures",
823 "episode_bundle_id",
824 "TEXT",
825 )?;
826 add_column_if_missing(
827 conn,
828 "projection_import_failures",
829 "episode_bundle_json",
830 "TEXT",
831 )?;
832 add_column_if_missing(
833 conn,
834 "projection_import_failures",
835 "execution_context_json",
836 "TEXT",
837 )?;
838 Ok(())
839}
840
841fn run_migration_v20(conn: &Connection) -> Result<(), rusqlite::Error> {
842 add_column_if_missing(conn, "derived_vector_artifacts", "encoded_digest", "TEXT")?;
843 conn.execute(
844 "UPDATE derived_vector_artifacts
845 SET encoded_digest = artifact_digest
846 WHERE encoded_digest IS NULL OR encoded_digest = ''",
847 [],
848 )?;
849 add_column_if_missing(
850 conn,
851 "derived_vector_artifacts",
852 "encoding",
853 "TEXT NOT NULL DEFAULT 'turbo_code_wire_v1'",
854 )?;
855 add_column_if_missing(
856 conn,
857 "derived_vector_artifacts",
858 "dim",
859 "INTEGER NOT NULL DEFAULT 0",
860 )?;
861 add_column_if_missing(
862 conn,
863 "derived_vector_artifacts",
864 "status",
865 "TEXT NOT NULL DEFAULT 'active'",
866 )?;
867 conn.execute_batch(
868 "CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_profile
869 ON derived_vector_artifacts(codec_family, codec_profile_digest, status);
870 CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_source_digest
871 ON derived_vector_artifacts(source_embedding_digest);",
872 )?;
873 Ok(())
874}
875
876fn run_migration_v21(conn: &Connection) -> Result<(), rusqlite::Error> {
877 conn.execute_batch(MIGRATION_V21)?;
878 add_column_if_missing(conn, "derived_vector_artifacts", "generation_id", "TEXT")?;
879 conn.execute_batch(
880 "CREATE INDEX IF NOT EXISTS idx_derived_vector_artifacts_generation
881 ON derived_vector_artifacts(generation_id, status);",
882 )?;
883 Ok(())
884}
885
886const SEARCH_RECEIPT_SCHEMA_VERSION: &str = "vector_search_receipt_v1";
887
888#[derive(Debug, Serialize, Deserialize)]
889struct StoredVectorSearchReceiptV1 {
890 #[serde(default = "default_search_receipt_schema_version")]
891 schema_version: String,
892 receipt_id: String,
893 evaluation_time: DateTime<Utc>,
894 #[serde(default)]
895 receipt_digest: Option<String>,
896 #[serde(default)]
897 trace_id: Option<String>,
898 #[serde(default)]
899 attempt_family_id: Option<String>,
900 #[serde(default)]
901 attempt_id: Option<String>,
902 #[serde(default)]
903 replay_of: Option<String>,
904 query_embedding_digest: Option<String>,
905 #[serde(default)]
906 query_text_digest: Option<String>,
907 #[serde(default)]
908 query_input_digest: Option<String>,
909 #[serde(default)]
910 filter_digest: Option<String>,
911 #[serde(default)]
912 redaction_state: Option<String>,
913 #[serde(default)]
914 budget_id: Option<String>,
915 #[serde(default)]
916 deadline_at: Option<DateTime<Utc>>,
917 search_profile: String,
918 candidate_backend: String,
919 codec_family: Option<String>,
920 codec_profile_digest: Option<String>,
921 #[serde(default)]
922 artifact_profile_digest: Option<String>,
923 #[serde(default)]
924 artifact_count: Option<u64>,
925 #[serde(default)]
926 artifact_corruption_count: Option<u64>,
927 #[serde(default)]
928 artifact_missing_count: Option<u64>,
929 #[serde(default)]
930 vector_artifact_manifest_digest: Option<String>,
931 #[serde(default)]
932 artifact_generation_id: Option<String>,
933 #[serde(default)]
934 approximate_scanned_count: Option<u64>,
935 #[serde(default)]
936 approximate_returned_count: Option<u64>,
937 #[serde(default)]
938 raw_rows_loaded_count: Option<u64>,
939 #[serde(default)]
940 filter_strategy: Option<String>,
941 #[serde(default)]
942 vector_artifact_count: Option<u64>,
943 #[serde(default)]
944 vector_artifact_missing_count: Option<u64>,
945 #[serde(default)]
946 vector_artifact_stale_count: Option<u64>,
947 #[serde(default)]
948 exact_rerank_count: Option<u64>,
949 #[serde(default)]
950 approximate_candidate_count: Option<u64>,
951 #[serde(default)]
952 fallback_reason: Option<String>,
953 approximate: bool,
954 requested_candidates: u64,
955 returned_candidates: u64,
956 post_filter_candidates: u64,
957 fallback: Option<String>,
958 exact_rerank: bool,
959 result_ids: Vec<String>,
960 degradations: Vec<String>,
961}
962
963fn default_search_receipt_schema_version() -> String {
964 SEARCH_RECEIPT_SCHEMA_VERSION.to_string()
965}
966
967fn b3_digest(bytes: &[u8]) -> String {
968 format!("blake3:{}", ContentDigest::compute(bytes).hex())
969}
970
971#[cfg(feature = "turbo-quant-codec")]
973#[derive(Debug, Clone)]
974pub(crate) struct DerivedVectorArtifactRow {
975 pub item_key: String,
976 pub generation_id: Option<String>,
977 pub codec_family: String,
978 pub codec_profile_digest: String,
979 pub source_embedding_digest: String,
980 pub encoded_digest: String,
981 pub encoding: String,
982 pub dim: usize,
983 pub status: String,
984 pub encoded: Vec<u8>,
985 pub codec_governance_receipt_id: Option<String>,
987 pub codec_profile: Option<String>,
988 pub degradation_budget: Option<f64>,
989 pub raw_source_artifact_id: Option<String>,
990}
991
992#[cfg(feature = "turbo-quant-codec")]
994#[derive(Debug, Clone)]
995#[allow(dead_code)]
996pub(crate) struct DerivedVectorArtifactGenerationRow {
997 pub generation_id: String,
998 pub codec_family: String,
999 pub codec_profile_digest: String,
1000 pub source_snapshot_digest: String,
1001 pub source_row_count: usize,
1002 pub artifact_count: usize,
1003 pub dim: usize,
1004 pub encoding: String,
1005 pub artifact_manifest_digest: String,
1006 pub status: String,
1007}
1008
1009#[cfg(feature = "turbo-quant-codec")]
1011pub(crate) fn source_embedding_digest(
1012 blob: &[u8],
1013 expected_dim: usize,
1014) -> Result<String, MemoryError> {
1015 validate_vector_blob_len(blob, expected_dim)?;
1016 let mut builder = DigestBuilder::new();
1017 builder
1018 .update_str("semantic-memory.source_embedding.v1")
1019 .separator()
1020 .update(&(expected_dim as u64).to_le_bytes())
1021 .separator()
1022 .update(blob);
1023 Ok(format!("blake3:{}", builder.finalize().hex()))
1024}
1025
1026#[cfg(feature = "turbo-quant-codec")]
1027fn source_snapshot_digest(rows: &[DerivedVectorArtifactRow], dim: usize) -> String {
1028 let mut entries = rows
1029 .iter()
1030 .map(|row| (row.item_key.as_str(), row.source_embedding_digest.as_str()))
1031 .collect::<Vec<_>>();
1032 entries.sort_unstable();
1033
1034 let mut builder = DigestBuilder::new();
1035 builder
1036 .update_str("semantic-memory.vector_source_snapshot.v1")
1037 .separator()
1038 .update(&(dim as u64).to_le_bytes())
1039 .separator();
1040 for (item_key, source_embedding_digest) in entries {
1041 builder
1042 .update_str(item_key)
1043 .separator()
1044 .update_str(source_embedding_digest)
1045 .separator();
1046 }
1047 format!("blake3:{}", builder.finalize().hex())
1048}
1049
1050#[cfg(feature = "turbo-quant-codec")]
1051pub(crate) fn current_source_snapshot_digest(
1052 conn: &Connection,
1053 dim: usize,
1054) -> Result<(String, usize), MemoryError> {
1055 let mut stmt = conn.prepare(
1056 "SELECT 'fact:' || id AS item_key, embedding FROM facts WHERE embedding IS NOT NULL
1057 UNION ALL
1058 SELECT 'chunk:' || id AS item_key, embedding FROM chunks WHERE embedding IS NOT NULL
1059 UNION ALL
1060 SELECT 'msg:' || id AS item_key, embedding FROM messages WHERE embedding IS NOT NULL
1061 UNION ALL
1062 SELECT 'episode:' || episode_id AS item_key, embedding FROM episodes WHERE embedding IS NOT NULL",
1063 )?;
1064 let rows = stmt.query_map([], |row| {
1065 Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1066 })?;
1067
1068 let mut entries = Vec::new();
1069 for row in rows {
1070 let (item_key, blob) = row?;
1071 entries.push((item_key, source_embedding_digest(&blob, dim)?));
1072 }
1073 entries.sort_unstable();
1074
1075 let mut builder = DigestBuilder::new();
1076 builder
1077 .update_str("semantic-memory.vector_source_snapshot.v1")
1078 .separator()
1079 .update(&(dim as u64).to_le_bytes())
1080 .separator();
1081 for (item_key, source_embedding_digest) in &entries {
1082 builder
1083 .update_str(item_key)
1084 .separator()
1085 .update_str(source_embedding_digest)
1086 .separator();
1087 }
1088 Ok((
1089 format!("blake3:{}", builder.finalize().hex()),
1090 entries.len(),
1091 ))
1092}
1093
1094#[cfg(feature = "turbo-quant-codec")]
1095fn derived_artifact_manifest_digest(rows: &[DerivedVectorArtifactRow]) -> String {
1096 let mut entries = rows
1097 .iter()
1098 .map(|row| {
1099 (
1100 row.item_key.as_str(),
1101 row.source_embedding_digest.as_str(),
1102 row.encoded_digest.as_str(),
1103 )
1104 })
1105 .collect::<Vec<_>>();
1106 entries.sort_unstable();
1107
1108 let mut builder = DigestBuilder::new();
1109 builder
1110 .update_str("semantic-memory.vector_artifact_manifest.v1")
1111 .separator();
1112 for (item_key, source_embedding_digest, encoded_digest) in entries {
1113 builder
1114 .update_str(item_key)
1115 .separator()
1116 .update_str(source_embedding_digest)
1117 .separator()
1118 .update_str(encoded_digest)
1119 .separator();
1120 }
1121 format!("blake3:{}", builder.finalize().hex())
1122}
1123
1124#[cfg(feature = "turbo-quant-codec")]
1125pub(crate) fn upsert_derived_vector_artifact(
1126 conn: &Connection,
1127 row: &DerivedVectorArtifactRow,
1128) -> Result<(), MemoryError> {
1129 conn.execute(
1130 "INSERT OR REPLACE INTO derived_vector_artifacts
1131 (item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1132 encoded_digest, artifact_digest, encoding, dim, encoded, created_at, status,
1133 codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id)
1134 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, ?8, ?9, datetime('now'), ?10, ?11, ?12, ?13, ?14)",
1135 params![
1136 row.item_key,
1137 row.generation_id.as_deref(),
1138 row.codec_family,
1139 row.codec_profile_digest,
1140 row.source_embedding_digest,
1141 row.encoded_digest,
1142 row.encoding,
1143 i64::try_from(row.dim)
1144 .map_err(|err| MemoryError::Other(format!("artifact dim overflow: {err}")))?,
1145 row.encoded,
1146 row.status,
1147 row.codec_governance_receipt_id.as_deref(),
1148 row.codec_profile.as_deref(),
1149 row.degradation_budget,
1150 row.raw_source_artifact_id.as_deref(),
1151 ],
1152 )?;
1153 Ok(())
1154}
1155
1156pub fn delete_derived_vector_artifact(
1157 conn: &Connection,
1158 item_key: &str,
1159) -> Result<(), MemoryError> {
1160 conn.execute(
1161 "DELETE FROM derived_vector_artifacts WHERE item_key = ?1",
1162 params![item_key],
1163 )?;
1164 Ok(())
1165}
1166
1167pub fn invalidate_derived_vector_artifact(
1168 conn: &Connection,
1169 item_key: &str,
1170) -> Result<(), MemoryError> {
1171 conn.execute(
1172 "UPDATE derived_vector_artifacts
1173 SET status = 'invalidated'
1174 WHERE item_key = ?1 AND status = 'active'",
1175 params![item_key],
1176 )?;
1177 conn.execute(
1178 "UPDATE derived_vector_artifact_generations
1179 SET status = 'invalidated'
1180 WHERE status = 'active'",
1181 [],
1182 )?;
1183 Ok(())
1184}
1185
1186#[cfg(feature = "turbo-quant-codec")]
1187#[allow(dead_code)]
1188pub(crate) fn load_derived_vector_artifacts_by_profile(
1189 conn: &Connection,
1190 codec_family: &str,
1191 codec_profile_digest: &str,
1192) -> Result<Vec<DerivedVectorArtifactRow>, MemoryError> {
1193 let mut stmt = conn.prepare(
1194 "SELECT item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1195 encoded_digest, encoding, dim, status, encoded,
1196 codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id
1197 FROM derived_vector_artifacts
1198 WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1199 )?;
1200 let rows = stmt.query_map(params![codec_family, codec_profile_digest], |row| {
1201 let dim_i64: i64 = row.get(7)?;
1202 Ok(DerivedVectorArtifactRow {
1203 item_key: row.get(0)?,
1204 generation_id: row.get(1)?,
1205 codec_family: row.get(2)?,
1206 codec_profile_digest: row.get(3)?,
1207 source_embedding_digest: row.get(4)?,
1208 encoded_digest: row.get(5)?,
1209 encoding: row.get(6)?,
1210 dim: usize::try_from(dim_i64).map_err(|err| {
1211 rusqlite::Error::FromSqlConversionFailure(
1212 7,
1213 rusqlite::types::Type::Integer,
1214 Box::new(err),
1215 )
1216 })?,
1217 status: row.get(8)?,
1218 encoded: row.get(9)?,
1219 codec_governance_receipt_id: row.get(10)?,
1220 codec_profile: row.get(11)?,
1221 degradation_budget: row.get(12)?,
1222 raw_source_artifact_id: row.get(13)?,
1223 })
1224 })?;
1225
1226 let mut artifacts = Vec::new();
1227 for row in rows {
1228 artifacts.push(row?);
1229 }
1230 Ok(artifacts)
1231}
1232
1233#[cfg(feature = "turbo-quant-codec")]
1234pub(crate) fn load_derived_vector_artifacts_by_generation(
1235 conn: &Connection,
1236 generation_id: &str,
1237) -> Result<Vec<DerivedVectorArtifactRow>, MemoryError> {
1238 let mut stmt = conn.prepare(
1239 "SELECT item_key, generation_id, codec_family, codec_profile_digest, source_embedding_digest,
1240 encoded_digest, encoding, dim, status, encoded,
1241 codec_governance_receipt_id, codec_profile, degradation_budget, raw_source_artifact_id
1242 FROM derived_vector_artifacts
1243 WHERE generation_id = ?1 AND status = 'active'",
1244 )?;
1245 let rows = stmt.query_map(params![generation_id], |row| {
1246 let dim_i64: i64 = row.get(7)?;
1247 Ok(DerivedVectorArtifactRow {
1248 item_key: row.get(0)?,
1249 generation_id: row.get(1)?,
1250 codec_family: row.get(2)?,
1251 codec_profile_digest: row.get(3)?,
1252 source_embedding_digest: row.get(4)?,
1253 encoded_digest: row.get(5)?,
1254 encoding: row.get(6)?,
1255 dim: usize::try_from(dim_i64).map_err(|err| {
1256 rusqlite::Error::FromSqlConversionFailure(
1257 7,
1258 rusqlite::types::Type::Integer,
1259 Box::new(err),
1260 )
1261 })?,
1262 status: row.get(8)?,
1263 encoded: row.get(9)?,
1264 codec_governance_receipt_id: row.get(10)?,
1265 codec_profile: row.get(11)?,
1266 degradation_budget: row.get(12)?,
1267 raw_source_artifact_id: row.get(13)?,
1268 })
1269 })?;
1270
1271 let mut artifacts = Vec::new();
1272 for row in rows {
1273 artifacts.push(row?);
1274 }
1275 Ok(artifacts)
1276}
1277
1278#[cfg(feature = "turbo-quant-codec")]
1279pub(crate) fn current_derived_vector_generation(
1280 conn: &Connection,
1281 codec_family: &str,
1282 codec_profile_digest: &str,
1283) -> Result<Option<DerivedVectorArtifactGenerationRow>, MemoryError> {
1284 conn.query_row(
1285 "SELECT generation_id, codec_family, codec_profile_digest, source_snapshot_digest,
1286 source_row_count, artifact_count, dim, encoding, artifact_manifest_digest, status
1287 FROM derived_vector_artifact_generations
1288 WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'
1289 ORDER BY created_at DESC
1290 LIMIT 1",
1291 params![codec_family, codec_profile_digest],
1292 |row| {
1293 let source_row_count: i64 = row.get(4)?;
1294 let artifact_count: i64 = row.get(5)?;
1295 let dim: i64 = row.get(6)?;
1296 Ok(DerivedVectorArtifactGenerationRow {
1297 generation_id: row.get(0)?,
1298 codec_family: row.get(1)?,
1299 codec_profile_digest: row.get(2)?,
1300 source_snapshot_digest: row.get(3)?,
1301 source_row_count: usize::try_from(source_row_count).map_err(|err| {
1302 rusqlite::Error::FromSqlConversionFailure(
1303 4,
1304 rusqlite::types::Type::Integer,
1305 Box::new(err),
1306 )
1307 })?,
1308 artifact_count: usize::try_from(artifact_count).map_err(|err| {
1309 rusqlite::Error::FromSqlConversionFailure(
1310 5,
1311 rusqlite::types::Type::Integer,
1312 Box::new(err),
1313 )
1314 })?,
1315 dim: usize::try_from(dim).map_err(|err| {
1316 rusqlite::Error::FromSqlConversionFailure(
1317 6,
1318 rusqlite::types::Type::Integer,
1319 Box::new(err),
1320 )
1321 })?,
1322 encoding: row.get(7)?,
1323 artifact_manifest_digest: row.get(8)?,
1324 status: row.get(9)?,
1325 })
1326 },
1327 )
1328 .optional()
1329 .map_err(MemoryError::from)
1330}
1331
1332pub fn count_derived_vector_artifacts(
1333 conn: &Connection,
1334 codec_family: &str,
1335 codec_profile_digest: &str,
1336) -> Result<usize, MemoryError> {
1337 let count: i64 = conn.query_row(
1338 "SELECT COUNT(*) FROM derived_vector_artifacts
1339 WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1340 params![codec_family, codec_profile_digest],
1341 |row| row.get(0),
1342 )?;
1343 usize::try_from(count)
1344 .map_err(|err| MemoryError::Other(format!("derived artifact count overflow: {err}")))
1345}
1346
1347#[cfg(feature = "turbo-quant-codec")]
1348pub(crate) fn rebuild_turbo_quant_artifacts(
1349 conn: &Connection,
1350 dim: usize,
1351 bits: u8,
1352 projections: usize,
1353 seed: u64,
1354) -> Result<VectorArtifactBuildReceiptV1, MemoryError> {
1355 use crate::vector_codec::{TurboQuantCodec, VectorCodec};
1356
1357 let started = std::time::Instant::now();
1358 let codec = TurboQuantCodec::new(dim, bits, projections, seed)?;
1359 let codec_profile_digest = codec.profile().digest();
1360 let generation_id = uuid::Uuid::new_v4().to_string();
1361 let mut source_row_count = 0usize;
1362 let mut artifact_count = 0usize;
1363 let mut skipped_row_count = 0usize;
1364 let mut degradations = Vec::new();
1365
1366 let mut stmt = conn.prepare(
1367 "SELECT 'fact:' || id AS item_key, embedding FROM facts WHERE embedding IS NOT NULL
1368 UNION ALL
1369 SELECT 'chunk:' || id AS item_key, embedding FROM chunks WHERE embedding IS NOT NULL
1370 UNION ALL
1371 SELECT 'msg:' || id AS item_key, embedding FROM messages WHERE embedding IS NOT NULL
1372 UNION ALL
1373 SELECT 'episode:' || episode_id AS item_key, embedding FROM episodes WHERE embedding IS NOT NULL",
1374 )?;
1375 let rows = stmt.query_map([], |row| {
1376 Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
1377 })?;
1378
1379 let mut pending = Vec::new();
1380 for row in rows {
1381 let (item_key, blob) = row?;
1382 source_row_count += 1;
1383 let embedding = match decode_f32_le(&blob, dim) {
1384 Ok(embedding) => embedding,
1385 Err(err) => {
1386 skipped_row_count += 1;
1387 degradations.push(format!(
1388 "skipped {item_key}: invalid authoritative embedding: {err}"
1389 ));
1390 continue;
1391 }
1392 };
1393 let artifact = match codec.encode(&embedding) {
1394 Ok(artifact) => artifact,
1395 Err(err) => {
1396 skipped_row_count += 1;
1397 degradations.push(format!("skipped {item_key}: encode failed: {err}"));
1398 continue;
1399 }
1400 };
1401 pending.push(DerivedVectorArtifactRow {
1402 item_key,
1403 generation_id: Some(generation_id.clone()),
1404 codec_family: "turbo_quant".to_string(),
1405 codec_profile_digest: codec_profile_digest.clone(),
1406 source_embedding_digest: source_embedding_digest(&blob, dim)?,
1407 encoded_digest: artifact.artifact_digest,
1408 encoding: "turbo_code_wire_v1".to_string(),
1409 dim,
1410 status: "active".to_string(),
1411 encoded: artifact.encoded,
1412 codec_governance_receipt_id: None,
1415 codec_profile: None,
1416 degradation_budget: None,
1417 raw_source_artifact_id: None,
1418 });
1419 }
1420 drop(stmt);
1421
1422 let build_receipt_id = uuid::Uuid::new_v4().to_string();
1423 let source_snapshot_digest = source_snapshot_digest(&pending, dim);
1424 let artifact_manifest_digest = derived_artifact_manifest_digest(&pending);
1425 let source_tables = vec![
1426 "facts".to_string(),
1427 "chunks".to_string(),
1428 "messages".to_string(),
1429 "episodes".to_string(),
1430 ];
1431 let generation_manifest = DerivedVectorArtifactGenerationV1 {
1432 schema_version: "derived_vector_artifact_generation_v1".to_string(),
1433 generation_id: generation_id.clone(),
1434 codec_family: "turbo_quant".to_string(),
1435 codec_profile_digest: codec_profile_digest.clone(),
1436 source_snapshot_digest: source_snapshot_digest.clone(),
1437 source_row_count,
1438 artifact_count: pending.len(),
1439 source_tables,
1440 dim,
1441 encoding: "turbo_code_wire_v1".to_string(),
1442 created_at: Utc::now(),
1443 build_receipt_id: Some(build_receipt_id.clone()),
1444 artifact_manifest_digest: artifact_manifest_digest.clone(),
1445 status: if skipped_row_count == 0 {
1446 "active".to_string()
1447 } else {
1448 "failed".to_string()
1449 },
1450 degradations: degradations.clone(),
1451 };
1452
1453 with_transaction(conn, |tx| {
1454 tx.execute(
1455 "UPDATE derived_vector_artifact_generations
1456 SET status = 'superseded'
1457 WHERE codec_family = ?1 AND codec_profile_digest = ?2 AND status = 'active'",
1458 params!["turbo_quant", &codec_profile_digest],
1459 )?;
1460 tx.execute(
1461 "DELETE FROM derived_vector_artifacts
1462 WHERE codec_family = ?1 AND codec_profile_digest = ?2",
1463 params!["turbo_quant", &codec_profile_digest],
1464 )?;
1465 tx.execute(
1466 "INSERT INTO derived_vector_artifact_generations
1467 (generation_id, schema_version, codec_family, codec_profile_digest,
1468 source_snapshot_digest, source_row_count, artifact_count, source_tables_json,
1469 dim, encoding, created_at, build_receipt_id, artifact_manifest_digest,
1470 status, degradations_json)
1471 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
1472 params![
1473 generation_manifest.generation_id,
1474 generation_manifest.schema_version,
1475 generation_manifest.codec_family,
1476 generation_manifest.codec_profile_digest,
1477 generation_manifest.source_snapshot_digest,
1478 i64::try_from(generation_manifest.source_row_count).map_err(|err| {
1479 MemoryError::Other(format!("source row count overflow: {err}"))
1480 })?,
1481 i64::try_from(generation_manifest.artifact_count).map_err(|err| {
1482 MemoryError::Other(format!("artifact count overflow: {err}"))
1483 })?,
1484 serde_json::to_string(&generation_manifest.source_tables)
1485 .map_err(|err| MemoryError::Other(err.to_string()))?,
1486 i64::try_from(generation_manifest.dim)
1487 .map_err(|err| MemoryError::Other(format!("artifact dim overflow: {err}")))?,
1488 generation_manifest.encoding,
1489 generation_manifest.created_at.to_rfc3339(),
1490 generation_manifest.build_receipt_id,
1491 generation_manifest.artifact_manifest_digest,
1492 generation_manifest.status,
1493 serde_json::to_string(&generation_manifest.degradations)
1494 .map_err(|err| MemoryError::Other(err.to_string()))?,
1495 ],
1496 )?;
1497 for row in &pending {
1498 upsert_derived_vector_artifact(tx, row)?;
1499 artifact_count += 1;
1500 }
1501 Ok(())
1502 })?;
1503
1504 Ok(VectorArtifactBuildReceiptV1 {
1505 schema_version: "vector_artifact_build_receipt_v1".to_string(),
1506 codec_family: "turbo_quant".to_string(),
1507 codec_profile_digest,
1508 source_row_count,
1509 artifact_count,
1510 generation_id: Some(generation_id),
1511 source_snapshot_digest: Some(source_snapshot_digest),
1512 artifact_manifest_digest: Some(artifact_manifest_digest),
1513 build_receipt_id: Some(build_receipt_id),
1514 skipped_row_count,
1515 elapsed_ms: started.elapsed().as_millis(),
1516 created_at: Utc::now(),
1517 degradations,
1518 })
1519}
1520
1521fn receipt_count_to_u64(value: usize, field: &'static str) -> Result<u64, MemoryError> {
1522 u64::try_from(value).map_err(|err| MemoryError::Other(format!("{field} is too large: {err}")))
1523}
1524
1525fn receipt_count_to_i64(value: u64, field: &'static str) -> Result<i64, MemoryError> {
1526 i64::try_from(value).map_err(|err| MemoryError::Other(format!("{field} is too large: {err}")))
1527}
1528
1529fn receipt_count_to_usize(
1530 value: u64,
1531 receipt_id: &str,
1532 field: &'static str,
1533) -> Result<usize, MemoryError> {
1534 usize::try_from(value).map_err(|err| MemoryError::CorruptData {
1535 table: "search_receipts",
1536 row_id: receipt_id.to_string(),
1537 detail: format!("{field} does not fit this platform: {err}"),
1538 })
1539}
1540
1541fn stored_search_receipt(
1542 receipt: &VectorSearchReceiptV1,
1543) -> Result<StoredVectorSearchReceiptV1, MemoryError> {
1544 Ok(StoredVectorSearchReceiptV1 {
1545 schema_version: SEARCH_RECEIPT_SCHEMA_VERSION.to_string(),
1546 receipt_id: receipt.receipt_id.clone(),
1547 evaluation_time: receipt.evaluation_time,
1548 receipt_digest: receipt.receipt_digest.clone(),
1549 trace_id: receipt.trace_id.clone(),
1550 attempt_family_id: receipt.attempt_family_id.clone(),
1551 attempt_id: receipt.attempt_id.clone(),
1552 replay_of: receipt.replay_of.clone(),
1553 query_embedding_digest: receipt.query_embedding_digest.clone(),
1554 query_text_digest: receipt.query_text_digest.clone(),
1555 query_input_digest: receipt.query_input_digest.clone(),
1556 filter_digest: receipt.filter_digest.clone(),
1557 redaction_state: receipt.redaction_state.clone(),
1558 budget_id: receipt.budget_id.clone(),
1559 deadline_at: receipt.deadline_at,
1560 search_profile: receipt.search_profile.clone(),
1561 candidate_backend: receipt.candidate_backend.clone(),
1562 codec_family: receipt.codec_family.clone(),
1563 codec_profile_digest: receipt.codec_profile_digest.clone(),
1564 artifact_profile_digest: receipt.artifact_profile_digest.clone(),
1565 artifact_count: receipt
1566 .artifact_count
1567 .map(|value| receipt_count_to_u64(value, "artifact_count"))
1568 .transpose()?,
1569 artifact_corruption_count: receipt
1570 .artifact_corruption_count
1571 .map(|value| receipt_count_to_u64(value, "artifact_corruption_count"))
1572 .transpose()?,
1573 artifact_missing_count: receipt
1574 .artifact_missing_count
1575 .map(|value| receipt_count_to_u64(value, "artifact_missing_count"))
1576 .transpose()?,
1577 vector_artifact_manifest_digest: receipt.vector_artifact_manifest_digest.clone(),
1578 artifact_generation_id: receipt.artifact_generation_id.clone(),
1579 approximate_scanned_count: receipt
1580 .approximate_scanned_count
1581 .map(|value| receipt_count_to_u64(value, "approximate_scanned_count"))
1582 .transpose()?,
1583 approximate_returned_count: receipt
1584 .approximate_returned_count
1585 .map(|value| receipt_count_to_u64(value, "approximate_returned_count"))
1586 .transpose()?,
1587 raw_rows_loaded_count: receipt
1588 .raw_rows_loaded_count
1589 .map(|value| receipt_count_to_u64(value, "raw_rows_loaded_count"))
1590 .transpose()?,
1591 filter_strategy: receipt.filter_strategy.clone(),
1592 vector_artifact_count: receipt
1593 .vector_artifact_count
1594 .map(|value| receipt_count_to_u64(value, "vector_artifact_count"))
1595 .transpose()?,
1596 vector_artifact_missing_count: receipt
1597 .vector_artifact_missing_count
1598 .map(|value| receipt_count_to_u64(value, "vector_artifact_missing_count"))
1599 .transpose()?,
1600 vector_artifact_stale_count: receipt
1601 .vector_artifact_stale_count
1602 .map(|value| receipt_count_to_u64(value, "vector_artifact_stale_count"))
1603 .transpose()?,
1604 exact_rerank_count: receipt
1605 .exact_rerank_count
1606 .map(|value| receipt_count_to_u64(value, "exact_rerank_count"))
1607 .transpose()?,
1608 approximate_candidate_count: receipt
1609 .approximate_candidate_count
1610 .map(|value| receipt_count_to_u64(value, "approximate_candidate_count"))
1611 .transpose()?,
1612 fallback_reason: receipt.fallback_reason.clone(),
1613 approximate: receipt.approximate,
1614 requested_candidates: receipt_count_to_u64(
1615 receipt.requested_candidates,
1616 "requested_candidates",
1617 )?,
1618 returned_candidates: receipt_count_to_u64(
1619 receipt.returned_candidates,
1620 "returned_candidates",
1621 )?,
1622 post_filter_candidates: receipt_count_to_u64(
1623 receipt.post_filter_candidates,
1624 "post_filter_candidates",
1625 )?,
1626 fallback: receipt.fallback.clone(),
1627 exact_rerank: receipt.exact_rerank,
1628 result_ids: receipt.result_ids.clone(),
1629 degradations: receipt.degradations.clone(),
1630 })
1631}
1632
1633fn search_receipt_from_stored(
1634 stored: StoredVectorSearchReceiptV1,
1635) -> Result<VectorSearchReceiptV1, MemoryError> {
1636 if stored.schema_version != SEARCH_RECEIPT_SCHEMA_VERSION {
1637 return Err(MemoryError::CorruptData {
1638 table: "search_receipts",
1639 row_id: stored.receipt_id,
1640 detail: format!(
1641 "unsupported receipt schema version '{}'",
1642 stored.schema_version
1643 ),
1644 });
1645 }
1646
1647 Ok(VectorSearchReceiptV1 {
1648 schema_version: stored.schema_version.clone(),
1649 receipt_digest: stored.receipt_digest,
1650 receipt_id: stored.receipt_id.clone(),
1651 evaluation_time: stored.evaluation_time,
1652 trace_id: stored.trace_id,
1653 attempt_family_id: stored.attempt_family_id,
1654 attempt_id: stored.attempt_id,
1655 replay_of: stored.replay_of,
1656 query_embedding_digest: stored.query_embedding_digest,
1657 query_text_digest: stored.query_text_digest,
1658 query_input_digest: stored.query_input_digest,
1659 filter_digest: stored.filter_digest,
1660 redaction_state: stored.redaction_state,
1661 budget_id: stored.budget_id,
1662 deadline_at: stored.deadline_at,
1663 search_profile: stored.search_profile,
1664 candidate_backend: stored.candidate_backend,
1665 codec_family: stored.codec_family,
1666 codec_profile_digest: stored.codec_profile_digest,
1667 artifact_profile_digest: stored.artifact_profile_digest,
1668 artifact_count: stored
1669 .artifact_count
1670 .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "artifact_count"))
1671 .transpose()?,
1672 artifact_corruption_count: stored
1673 .artifact_corruption_count
1674 .map(|value| {
1675 receipt_count_to_usize(value, &stored.receipt_id, "artifact_corruption_count")
1676 })
1677 .transpose()?,
1678 artifact_missing_count: stored
1679 .artifact_missing_count
1680 .map(|value| {
1681 receipt_count_to_usize(value, &stored.receipt_id, "artifact_missing_count")
1682 })
1683 .transpose()?,
1684 vector_artifact_manifest_digest: stored.vector_artifact_manifest_digest,
1685 artifact_generation_id: stored.artifact_generation_id,
1686 approximate_scanned_count: stored
1687 .approximate_scanned_count
1688 .map(|value| {
1689 receipt_count_to_usize(value, &stored.receipt_id, "approximate_scanned_count")
1690 })
1691 .transpose()?,
1692 approximate_returned_count: stored
1693 .approximate_returned_count
1694 .map(|value| {
1695 receipt_count_to_usize(value, &stored.receipt_id, "approximate_returned_count")
1696 })
1697 .transpose()?,
1698 raw_rows_loaded_count: stored
1699 .raw_rows_loaded_count
1700 .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "raw_rows_loaded_count"))
1701 .transpose()?,
1702 filter_strategy: stored.filter_strategy,
1703 vector_artifact_count: stored
1704 .vector_artifact_count
1705 .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_count"))
1706 .transpose()?,
1707 vector_artifact_missing_count: stored
1708 .vector_artifact_missing_count
1709 .map(|value| {
1710 receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_missing_count")
1711 })
1712 .transpose()?,
1713 vector_artifact_stale_count: stored
1714 .vector_artifact_stale_count
1715 .map(|value| {
1716 receipt_count_to_usize(value, &stored.receipt_id, "vector_artifact_stale_count")
1717 })
1718 .transpose()?,
1719 exact_rerank_count: stored
1720 .exact_rerank_count
1721 .map(|value| receipt_count_to_usize(value, &stored.receipt_id, "exact_rerank_count"))
1722 .transpose()?,
1723 approximate_candidate_count: stored
1724 .approximate_candidate_count
1725 .map(|value| {
1726 receipt_count_to_usize(value, &stored.receipt_id, "approximate_candidate_count")
1727 })
1728 .transpose()?,
1729 fallback_reason: stored.fallback_reason,
1730 approximate: stored.approximate,
1731 requested_candidates: receipt_count_to_usize(
1732 stored.requested_candidates,
1733 &stored.receipt_id,
1734 "requested_candidates",
1735 )?,
1736 returned_candidates: receipt_count_to_usize(
1737 stored.returned_candidates,
1738 &stored.receipt_id,
1739 "returned_candidates",
1740 )?,
1741 post_filter_candidates: receipt_count_to_usize(
1742 stored.post_filter_candidates,
1743 &stored.receipt_id,
1744 "post_filter_candidates",
1745 )?,
1746 fallback: stored.fallback,
1747 exact_rerank: stored.exact_rerank,
1748 result_ids: stored.result_ids,
1749 degradations: stored.degradations,
1750 })
1751}
1752
1753pub fn store_search_receipt(
1758 conn: &Connection,
1759 receipt: &VectorSearchReceiptV1,
1760) -> Result<(), MemoryError> {
1761 let stored = stored_search_receipt(receipt)?;
1762 let receipt_json = serde_json::to_string(&stored)
1763 .map_err(|err| MemoryError::Other(format!("failed to serialize search receipt: {err}")))?;
1764 let receipt_digest = b3_digest(receipt_json.as_bytes());
1765
1766 let existing_digest: Option<String> = conn
1767 .query_row(
1768 "SELECT receipt_digest FROM search_receipts WHERE receipt_id = ?1",
1769 params![&stored.receipt_id],
1770 |row| row.get(0),
1771 )
1772 .optional()?;
1773 if let Some(existing_digest) = existing_digest {
1774 if existing_digest == receipt_digest {
1775 return Ok(());
1776 }
1777 return Err(MemoryError::SearchReceiptConflict {
1778 receipt_id: stored.receipt_id,
1779 });
1780 }
1781
1782 let result_ids_json = serde_json::to_string(&stored.result_ids).map_err(|err| {
1783 MemoryError::Other(format!(
1784 "failed to serialize search receipt result IDs: {err}"
1785 ))
1786 })?;
1787 conn.execute(
1788 "INSERT INTO search_receipts (
1789 receipt_id,
1790 schema_version,
1791 evaluation_time,
1792 search_profile,
1793 candidate_backend,
1794 approximate,
1795 exact_rerank,
1796 fallback,
1797 requested_candidates,
1798 returned_candidates,
1799 post_filter_candidates,
1800 result_ids_json,
1801 receipt_json,
1802 receipt_digest
1803 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
1804 params![
1805 &stored.receipt_id,
1806 SEARCH_RECEIPT_SCHEMA_VERSION,
1807 stored.evaluation_time.to_rfc3339(),
1808 &stored.search_profile,
1809 &stored.candidate_backend,
1810 if stored.approximate { 1_i64 } else { 0_i64 },
1811 if stored.exact_rerank { 1_i64 } else { 0_i64 },
1812 &stored.fallback,
1813 receipt_count_to_i64(stored.requested_candidates, "requested_candidates")?,
1814 receipt_count_to_i64(stored.returned_candidates, "returned_candidates")?,
1815 receipt_count_to_i64(stored.post_filter_candidates, "post_filter_candidates")?,
1816 &result_ids_json,
1817 &receipt_json,
1818 &receipt_digest,
1819 ],
1820 )?;
1821 Ok(())
1822}
1823
1824pub fn get_search_receipt(
1826 conn: &Connection,
1827 receipt_id: &str,
1828) -> Result<Option<VectorSearchReceiptV1>, MemoryError> {
1829 let row: Option<(String, String, String)> = conn
1830 .query_row(
1831 "SELECT schema_version, receipt_json, receipt_digest
1832 FROM search_receipts
1833 WHERE receipt_id = ?1",
1834 params![receipt_id],
1835 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1836 )
1837 .optional()?;
1838
1839 let Some((schema_version, receipt_json, receipt_digest)) = row else {
1840 return Ok(None);
1841 };
1842 if schema_version != SEARCH_RECEIPT_SCHEMA_VERSION {
1843 return Err(MemoryError::CorruptData {
1844 table: "search_receipts",
1845 row_id: receipt_id.to_string(),
1846 detail: format!("unsupported receipt schema version '{schema_version}'"),
1847 });
1848 }
1849
1850 let stored: StoredVectorSearchReceiptV1 =
1851 serde_json::from_str(&receipt_json).map_err(|err| MemoryError::CorruptData {
1852 table: "search_receipts",
1853 row_id: receipt_id.to_string(),
1854 detail: format!("invalid receipt JSON: {err}"),
1855 })?;
1856 let mut receipt = search_receipt_from_stored(stored)?;
1857 receipt.receipt_digest = Some(receipt_digest);
1858 Ok(Some(receipt))
1859}
1860
1861fn add_column_if_missing(
1862 conn: &Connection,
1863 table: &str,
1864 column: &str,
1865 column_sql: &str,
1866) -> Result<(), rusqlite::Error> {
1867 let pragma = format!("PRAGMA table_info({table})");
1868 let mut stmt = conn.prepare(&pragma)?;
1869 let exists = stmt
1870 .query_map([], |row| row.get::<_, String>(1))?
1871 .collect::<Result<Vec<_>, _>>()?
1872 .into_iter()
1873 .any(|name| name == column);
1874
1875 if !exists {
1876 conn.execute(
1877 &format!("ALTER TABLE {table} ADD COLUMN {column} {column_sql}"),
1878 [],
1879 )?;
1880 }
1881
1882 Ok(())
1883}
1884
1885pub fn check_embedding_metadata(
1887 conn: &Connection,
1888 config: &EmbeddingConfig,
1889) -> Result<(), MemoryError> {
1890 let existing: Option<(String, usize)> = conn
1892 .query_row(
1893 "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1894 [],
1895 |row| Ok((row.get(0)?, row.get(1)?)),
1896 )
1897 .ok();
1898
1899 match existing {
1900 Some((model, dims)) => {
1901 if model != config.model || dims != config.dimensions {
1902 tracing::warn!(
1903 stored_model = %model,
1904 stored_dims = dims,
1905 configured_model = %config.model,
1906 configured_dims = config.dimensions,
1907 "Embedding model changed. Existing embeddings are stale."
1908 );
1909 conn.execute(
1910 "UPDATE embedding_metadata
1911 SET model_name = ?1,
1912 dimensions = ?2,
1913 embeddings_dirty = 1,
1914 updated_at = datetime('now')
1915 WHERE id = 1",
1916 params![config.model, config.dimensions],
1917 )?;
1918 }
1919 }
1920 None => {
1921 conn.execute(
1922 "INSERT INTO embedding_metadata (id, model_name, dimensions) VALUES (1, ?1, ?2)",
1923 params![config.model, config.dimensions],
1924 )?;
1925 }
1926 }
1927
1928 Ok(())
1929}
1930
1931pub fn embedding_to_bytes(embedding: &[f32]) -> Vec<u8> {
1933 encode_f32_le(embedding)
1934}
1935
1936pub fn encode_f32_le(values: &[f32]) -> Vec<u8> {
1938 let mut bytes = Vec::with_capacity(values.len() * 4);
1939 for value in values {
1940 bytes.extend_from_slice(&value.to_le_bytes());
1941 }
1942 bytes
1943}
1944
1945pub(crate) fn validate_embedding(values: &[f32], expected_dim: usize) -> Result<(), MemoryError> {
1947 if values.len() != expected_dim {
1948 return Err(MemoryError::EmbeddingDimensionMismatch {
1949 expected: expected_dim,
1950 actual: values.len(),
1951 });
1952 }
1953 if let Some((index, _)) = values
1954 .iter()
1955 .enumerate()
1956 .find(|(_, value)| !value.is_finite())
1957 {
1958 return Err(MemoryError::NonFiniteEmbeddingValue { index });
1959 }
1960 Ok(())
1961}
1962
1963pub(crate) fn validate_embedding_batch(
1965 values: &[Vec<f32>],
1966 requested: usize,
1967 expected_dim: usize,
1968) -> Result<(), MemoryError> {
1969 if values.len() != requested {
1970 return Err(MemoryError::EmbeddingBatchCountMismatch {
1971 requested,
1972 returned: values.len(),
1973 });
1974 }
1975 for embedding in values {
1976 validate_embedding(embedding, expected_dim)?;
1977 }
1978 Ok(())
1979}
1980
1981pub(crate) fn validate_vector_blob_len(
1983 bytes: &[u8],
1984 expected_dim: usize,
1985) -> Result<(), MemoryError> {
1986 let expected_bytes = expected_dim
1987 .checked_mul(4)
1988 .ok_or_else(|| MemoryError::InvalidConfig {
1989 field: "embedding.dimensions",
1990 reason: "dimension byte length overflow".to_string(),
1991 })?;
1992 if bytes.len() != expected_bytes {
1993 return Err(MemoryError::VectorBlobLengthMismatch {
1994 expected_bytes,
1995 actual_bytes: bytes.len(),
1996 });
1997 }
1998 Ok(())
1999}
2000
2001#[allow(clippy::manual_is_multiple_of)]
2003pub fn decode_f32_le(bytes: &[u8], expected_dim: usize) -> Result<Vec<f32>, MemoryError> {
2004 validate_vector_blob_len(bytes, expected_dim)?;
2005 decode_f32_le_unchecked_dim(bytes)
2006}
2007
2008#[allow(clippy::manual_is_multiple_of)]
2010pub fn bytes_to_embedding(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
2011 if bytes.len() % 4 != 0 {
2012 return Err(MemoryError::InvalidEmbedding {
2013 expected_bytes: bytes.len() - (bytes.len() % 4),
2014 actual_bytes: bytes.len(),
2015 });
2016 }
2017
2018 decode_f32_le_unchecked_dim(bytes)
2019}
2020
2021fn decode_f32_le_unchecked_dim(bytes: &[u8]) -> Result<Vec<f32>, MemoryError> {
2022 let mut embedding = Vec::with_capacity(bytes.len() / 4);
2023 for (index, chunk) in bytes.chunks_exact(4).enumerate() {
2024 let value = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
2025 if !value.is_finite() {
2026 return Err(MemoryError::NonFiniteEmbeddingValue { index });
2027 }
2028 embedding.push(value);
2029 }
2030 Ok(embedding)
2031}
2032
2033pub fn is_embeddings_dirty(conn: &Connection) -> Result<bool, MemoryError> {
2034 let dirty: i32 = conn
2035 .query_row(
2036 "SELECT COALESCE(embeddings_dirty, 0) FROM embedding_metadata WHERE id = 1",
2037 [],
2038 |row| row.get(0),
2039 )
2040 .unwrap_or(0);
2041 Ok(dirty != 0)
2042}
2043
2044pub fn clear_embeddings_dirty(conn: &Connection) -> Result<(), MemoryError> {
2045 conn.execute(
2046 "UPDATE embedding_metadata SET embeddings_dirty = 0 WHERE id = 1",
2047 [],
2048 )?;
2049 Ok(())
2050}
2051
2052#[cfg(feature = "hnsw")]
2053pub(crate) fn queue_pending_index_op(
2054 tx: &rusqlite::Transaction<'_>,
2055 item_key: &str,
2056 entity_type: &str,
2057 op_kind: IndexOpKind,
2058) -> Result<(), MemoryError> {
2059 tx.execute(
2060 "INSERT INTO pending_index_ops (item_key, entity_type, op_kind, attempt_count, last_error, updated_at)
2061 VALUES (?1, ?2, ?3, 0, NULL, datetime('now'))
2062 ON CONFLICT(item_key) DO UPDATE SET
2063 entity_type = excluded.entity_type,
2064 op_kind = excluded.op_kind,
2065 attempt_count = 0,
2066 last_error = NULL,
2067 updated_at = datetime('now')",
2068 params![item_key, entity_type, op_kind.as_str()],
2069 )?;
2070 mark_sidecar_dirty(tx)?;
2071 Ok(())
2072}
2073
2074#[cfg(feature = "hnsw")]
2075pub(crate) use IndexOpKind as PendingIndexOpKind;
2076
2077#[cfg(feature = "hnsw")]
2078pub(crate) fn enqueue_pending_index_op(
2079 tx: &rusqlite::Transaction<'_>,
2080 item_key: &str,
2081 entity_type: &str,
2082 op_kind: PendingIndexOpKind,
2083) -> Result<(), MemoryError> {
2084 queue_pending_index_op(tx, item_key, entity_type, op_kind)
2085}
2086
2087pub(crate) fn list_pending_index_ops(
2088 conn: &Connection,
2089) -> Result<Vec<PendingIndexOp>, MemoryError> {
2090 let table_exists: bool = conn
2091 .query_row(
2092 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
2093 [],
2094 |row| row.get(0),
2095 )
2096 .unwrap_or(false);
2097 if !table_exists {
2098 return Ok(Vec::new());
2099 }
2100
2101 let mut stmt = conn.prepare(
2102 "SELECT item_key, entity_type, op_kind, attempt_count, last_error
2103 FROM pending_index_ops
2104 ORDER BY updated_at ASC, item_key ASC",
2105 )?;
2106 let rows = stmt
2107 .query_map([], |row| {
2108 let item_key: String = row.get(0)?;
2109 let op_kind: String = row.get(2)?;
2110 Ok(PendingIndexOp {
2111 item_key: item_key.clone(),
2112 entity_type: row.get(1)?,
2113 op_kind: IndexOpKind::parse(&op_kind, &item_key).map_err(|e| {
2114 rusqlite::Error::FromSqlConversionFailure(
2115 2,
2116 rusqlite::types::Type::Text,
2117 Box::new(e),
2118 )
2119 })?,
2120 attempt_count: row.get::<_, i64>(3)? as u32,
2121 last_error: row.get(4)?,
2122 })
2123 })?
2124 .collect::<Result<Vec<_>, _>>()?;
2125 Ok(rows)
2126}
2127
2128#[cfg(feature = "hnsw")]
2129pub(crate) fn pending_index_op_count(conn: &Connection) -> Result<usize, MemoryError> {
2130 let table_exists: bool = conn
2131 .query_row(
2132 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='pending_index_ops'",
2133 [],
2134 |row| row.get(0),
2135 )
2136 .unwrap_or(false);
2137 if !table_exists {
2138 return Ok(0);
2139 }
2140
2141 let count: i64 = conn.query_row("SELECT COUNT(*) FROM pending_index_ops", [], |row| {
2142 row.get(0)
2143 })?;
2144 Ok(count as usize)
2145}
2146
2147#[cfg(feature = "hnsw")]
2148pub(crate) fn mark_pending_index_ops_failed(
2149 conn: &Connection,
2150 item_keys: &[String],
2151 error: &str,
2152) -> Result<(), MemoryError> {
2153 with_transaction(conn, |tx| {
2154 for item_key in item_keys {
2155 tx.execute(
2156 "UPDATE pending_index_ops
2157 SET attempt_count = attempt_count + 1,
2158 last_error = ?1,
2159 updated_at = datetime('now')
2160 WHERE item_key = ?2",
2161 params![error, item_key],
2162 )?;
2163 }
2164 Ok(())
2165 })
2166}
2167
2168#[cfg(feature = "hnsw")]
2169pub(crate) fn clear_pending_index_ops(
2170 conn: &Connection,
2171 item_keys: &[String],
2172) -> Result<(), MemoryError> {
2173 with_transaction(conn, |tx| {
2174 for item_key in item_keys {
2175 tx.execute(
2176 "DELETE FROM pending_index_ops WHERE item_key = ?1",
2177 params![item_key],
2178 )?;
2179 }
2180 Ok(())
2181 })
2182}
2183
2184#[cfg(feature = "hnsw")]
2185pub(crate) fn clear_all_pending_index_ops(conn: &Connection) -> Result<(), MemoryError> {
2186 conn.execute("DELETE FROM pending_index_ops", [])?;
2187 Ok(())
2188}
2189
2190#[cfg(feature = "hnsw")]
2191pub(crate) fn load_embedding_for_index_key(
2192 conn: &Connection,
2193 item_key: &str,
2194) -> Result<Option<Vec<f32>>, MemoryError> {
2195 let Some((domain, raw_id)) = item_key.split_once(':') else {
2196 return Err(MemoryError::InvalidKey(item_key.to_string()));
2197 };
2198
2199 let blob_result: Result<Option<Vec<u8>>, rusqlite::Error> = match domain {
2200 "fact" => conn.query_row(
2201 "SELECT embedding FROM facts WHERE id = ?1",
2202 params![raw_id],
2203 |row| row.get(0),
2204 ),
2205 "chunk" => conn.query_row(
2206 "SELECT embedding FROM chunks WHERE id = ?1",
2207 params![raw_id],
2208 |row| row.get(0),
2209 ),
2210 "msg" => {
2211 let message_id = raw_id
2212 .parse::<i64>()
2213 .map_err(|e| MemoryError::InvalidKey(format!("{}: {e}", item_key)))?;
2214 conn.query_row(
2215 "SELECT embedding FROM messages WHERE id = ?1",
2216 params![message_id],
2217 |row| row.get(0),
2218 )
2219 }
2220 "episode" => conn.query_row(
2221 "SELECT embedding FROM episodes WHERE episode_id = ?1",
2222 params![raw_id],
2223 |row| row.get(0),
2224 ),
2225 _ => return Err(MemoryError::InvalidKey(item_key.to_string())),
2226 };
2227
2228 let blob = match blob_result {
2229 Ok(blob) => blob,
2230 Err(rusqlite::Error::QueryReturnedNoRows) => None,
2231 Err(err) => return Err(err.into()),
2232 };
2233
2234 blob.map(|bytes| bytes_to_embedding(&bytes)).transpose()
2235}
2236
2237#[cfg(feature = "hnsw")]
2238fn mark_sidecar_dirty(tx: &rusqlite::Transaction<'_>) -> Result<(), MemoryError> {
2239 tx.execute(
2240 "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', '1')
2241 ON CONFLICT(key) DO UPDATE SET value = '1'",
2242 [],
2243 )?;
2244 Ok(())
2245}
2246
2247#[cfg(feature = "hnsw")]
2248pub(crate) fn is_sidecar_dirty(conn: &Connection) -> Result<bool, MemoryError> {
2249 let dirty: Option<String> = conn
2251 .query_row(
2252 "SELECT value FROM hnsw_metadata WHERE key = 'sidecar_dirty'",
2253 [],
2254 |row| row.get(0),
2255 )
2256 .ok();
2257 Ok(matches!(dirty.as_deref(), Some("1")))
2258}
2259
2260#[cfg(feature = "hnsw")]
2261pub(crate) fn set_sidecar_dirty(conn: &Connection, dirty: bool) -> Result<(), MemoryError> {
2262 conn.execute(
2263 "INSERT INTO hnsw_metadata (key, value) VALUES ('sidecar_dirty', ?1)
2264 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2265 params![if dirty { "1" } else { "0" }],
2266 )?;
2267 Ok(())
2268}
2269
2270pub(crate) fn parse_optional_json(
2271 table: &'static str,
2272 row_id: &str,
2273 field: &'static str,
2274 raw: Option<&str>,
2275) -> Result<Option<serde_json::Value>, MemoryError> {
2276 match raw {
2277 Some(raw) => serde_json::from_str(raw)
2278 .map(Some)
2279 .map_err(|e| MemoryError::CorruptData {
2280 table,
2281 row_id: row_id.to_string(),
2282 detail: format!("invalid {field}: {e}"),
2283 }),
2284 None => Ok(None),
2285 }
2286}
2287
2288pub(crate) fn parse_string_list_json(
2289 table: &'static str,
2290 row_id: &str,
2291 field: &'static str,
2292 raw: &str,
2293) -> Result<Vec<String>, MemoryError> {
2294 serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
2295 table,
2296 row_id: row_id.to_string(),
2297 detail: format!("invalid {field}: {e}"),
2298 })
2299}
2300
2301pub(crate) fn parse_role(
2302 table: &'static str,
2303 row_id: &str,
2304 raw: &str,
2305) -> Result<Role, MemoryError> {
2306 Role::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
2307 table,
2308 row_id: row_id.to_string(),
2309 detail: format!("invalid role '{raw}'"),
2310 })
2311}
2312
2313pub(crate) fn parse_episode_outcome(
2314 row_id: &str,
2315 raw: &str,
2316) -> Result<EpisodeOutcome, MemoryError> {
2317 EpisodeOutcome::from_str_value(raw).ok_or_else(|| MemoryError::CorruptData {
2318 table: "episodes",
2319 row_id: row_id.to_string(),
2320 detail: format!("invalid outcome '{raw}'"),
2321 })
2322}
2323
2324pub(crate) fn parse_verification_status(
2325 row_id: &str,
2326 raw: &str,
2327) -> Result<VerificationStatus, MemoryError> {
2328 serde_json::from_str(raw).map_err(|e| MemoryError::CorruptData {
2329 table: "episodes",
2330 row_id: row_id.to_string(),
2331 detail: format!("invalid verification_status: {e}"),
2332 })
2333}
2334
2335pub fn verify_integrity_sync(
2337 conn: &Connection,
2338 mode: VerifyMode,
2339) -> Result<IntegrityReport, MemoryError> {
2340 let mut issues = Vec::new();
2341
2342 let schema_version: u32 = conn
2343 .query_row("PRAGMA user_version", [], |row| row.get(0))
2344 .unwrap_or_else(|e| {
2345 issues.push(format!("failed to read schema version: {e}"));
2346 0
2347 });
2348 if schema_version > MAX_SCHEMA_VERSION {
2349 issues.push(format!(
2350 "schema version {} is ahead of supported {}",
2351 schema_version, MAX_SCHEMA_VERSION
2352 ));
2353 }
2354
2355 let fact_count: usize = conn
2356 .query_row("SELECT COUNT(*) FROM facts", [], |row| row.get(0))
2357 .unwrap_or_else(|e| {
2358 issues.push(format!("failed to count facts: {e}"));
2359 0
2360 });
2361 let chunk_count: usize = conn
2362 .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
2363 .unwrap_or_else(|e| {
2364 issues.push(format!("failed to count chunks: {e}"));
2365 0
2366 });
2367 let message_count: usize = conn
2368 .query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
2369 .unwrap_or_else(|e| {
2370 issues.push(format!("failed to count messages: {e}"));
2371 0
2372 });
2373 let episode_count: usize = conn
2374 .query_row("SELECT COUNT(*) FROM episodes", [], |row| row.get(0))
2375 .unwrap_or_else(|e| {
2376 issues.push(format!("failed to count episodes: {e}"));
2377 0
2378 });
2379
2380 let facts_missing_embeddings: usize = conn
2381 .query_row(
2382 "SELECT COUNT(*) FROM facts WHERE embedding IS NULL",
2383 [],
2384 |row| row.get(0),
2385 )
2386 .unwrap_or_else(|e| {
2387 issues.push(format!("failed to count facts missing embeddings: {e}"));
2388 0
2389 });
2390 let chunks_missing_embeddings: usize = conn
2391 .query_row(
2392 "SELECT COUNT(*) FROM chunks WHERE embedding IS NULL",
2393 [],
2394 |row| row.get(0),
2395 )
2396 .unwrap_or_else(|e| {
2397 issues.push(format!("failed to count chunks missing embeddings: {e}"));
2398 0
2399 });
2400 let episodes_missing_embeddings: usize = conn
2401 .query_row(
2402 "SELECT COUNT(*) FROM episodes WHERE embedding IS NULL",
2403 [],
2404 |row| row.get(0),
2405 )
2406 .unwrap_or_else(|e| {
2407 issues.push(format!("failed to count episodes missing embeddings: {e}"));
2408 0
2409 });
2410
2411 if facts_missing_embeddings > 0 {
2412 issues.push(format!(
2413 "{} facts missing embeddings",
2414 facts_missing_embeddings
2415 ));
2416 }
2417 if chunks_missing_embeddings > 0 {
2418 issues.push(format!(
2419 "{} chunks missing embeddings",
2420 chunks_missing_embeddings
2421 ));
2422 }
2423 if episodes_missing_embeddings > 0 {
2424 issues.push(format!(
2425 "{} episodes missing embeddings",
2426 episodes_missing_embeddings
2427 ));
2428 }
2429
2430 let pending_ops = list_pending_index_ops(conn).unwrap_or_default();
2431 if !pending_ops.is_empty() {
2432 issues.push(format!(
2433 "{} pending HNSW sidecar ops queued in SQLite",
2434 pending_ops.len()
2435 ));
2436 for op in pending_ops.iter().take(5) {
2437 let op_kind = op.op_kind.as_str();
2438 let detail = match &op.last_error {
2439 Some(last_error) => format!(
2440 "{} {} {} (attempts: {}, last_error: {})",
2441 op.entity_type,
2442 op.op_kind.as_str(),
2443 op.item_key,
2444 op.attempt_count,
2445 last_error
2446 ),
2447 None => format!(
2448 "{} {} {} (attempts: {})",
2449 op.entity_type, op_kind, op.item_key, op.attempt_count
2450 ),
2451 };
2452 issues.push(format!("pending sidecar op: {detail}"));
2453 }
2454 }
2455
2456 if mode == VerifyMode::Full {
2457 let dims: usize = conn
2458 .query_row(
2459 "SELECT dimensions FROM embedding_metadata WHERE id = 1",
2460 [],
2461 |row| row.get(0),
2462 )
2463 .unwrap_or_else(|e| {
2464 issues.push(format!("failed to read embedding dimensions: {e}"));
2465 0
2466 });
2467
2468 verify_fts_drift(conn, "facts", "facts_rowid_map", fact_count, &mut issues);
2469 verify_fts_drift(conn, "chunks", "chunks_rowid_map", chunk_count, &mut issues);
2470 verify_fts_drift(
2471 conn,
2472 "messages",
2473 "messages_rowid_map",
2474 message_count,
2475 &mut issues,
2476 );
2477 verify_fts_drift(
2478 conn,
2479 "episodes",
2480 "episodes_rowid_map",
2481 episode_count,
2482 &mut issues,
2483 );
2484
2485 verify_blob_table(conn, "facts", "id", "embedding", dims, &mut issues)?;
2486 verify_blob_table(conn, "chunks", "id", "embedding", dims, &mut issues)?;
2487 verify_blob_table(conn, "messages", "id", "embedding", dims, &mut issues)?;
2488 verify_blob_table(
2489 conn,
2490 "episodes",
2491 "episode_id",
2492 "embedding",
2493 dims,
2494 &mut issues,
2495 )?;
2496
2497 verify_quantized_table(conn, "facts", "id", dims, &mut issues)?;
2498 verify_quantized_table(conn, "chunks", "id", dims, &mut issues)?;
2499 verify_quantized_table(conn, "messages", "id", dims, &mut issues)?;
2500 verify_quantized_table(conn, "episodes", "episode_id", dims, &mut issues)?;
2501
2502 verify_session_rows(conn, &mut issues)?;
2503 verify_message_rows(conn, &mut issues)?;
2504 verify_fact_rows(conn, &mut issues)?;
2505 verify_document_rows(conn, &mut issues)?;
2506 verify_episode_rows(conn, &mut issues)?;
2507
2508 let integrity_check: String = conn
2509 .query_row("PRAGMA integrity_check", [], |row| row.get(0))
2510 .unwrap_or_else(|_| "error".to_string());
2511 if integrity_check != "ok" {
2512 issues.push(format!("SQLite integrity_check: {}", integrity_check));
2513 }
2514 }
2515
2516 Ok(IntegrityReport {
2517 ok: issues.is_empty(),
2518 schema_version,
2519 fact_count,
2520 chunk_count,
2521 message_count,
2522 facts_missing_embeddings,
2523 chunks_missing_embeddings,
2524 issues,
2525 })
2526}
2527
2528pub fn reconcile_fts(conn: &Connection) -> Result<(), MemoryError> {
2530 with_transaction(conn, |tx| {
2531 tx.execute_batch("DROP TABLE IF EXISTS facts_fts")?;
2532 tx.execute_batch("DELETE FROM facts_rowid_map")?;
2533 tx.execute_batch(
2534 "CREATE VIRTUAL TABLE facts_fts USING fts5(
2535 content,
2536 content='',
2537 content_rowid='rowid',
2538 tokenize='porter unicode61'
2539 )",
2540 )?;
2541 tx.execute_batch("INSERT INTO facts_rowid_map (fact_id) SELECT id FROM facts")?;
2542 tx.execute_batch(
2543 "INSERT INTO facts_fts (rowid, content)
2544 SELECT rm.rowid, f.content
2545 FROM facts_rowid_map rm
2546 JOIN facts f ON f.id = rm.fact_id",
2547 )?;
2548
2549 tx.execute_batch("DROP TABLE IF EXISTS chunks_fts")?;
2550 tx.execute_batch("DELETE FROM chunks_rowid_map")?;
2551 tx.execute_batch(
2552 "CREATE VIRTUAL TABLE chunks_fts USING fts5(
2553 content,
2554 content='',
2555 content_rowid='rowid',
2556 tokenize='porter unicode61'
2557 )",
2558 )?;
2559 tx.execute_batch("INSERT INTO chunks_rowid_map (chunk_id) SELECT id FROM chunks")?;
2560 tx.execute_batch(
2561 "INSERT INTO chunks_fts (rowid, content)
2562 SELECT rm.rowid, c.content
2563 FROM chunks_rowid_map rm
2564 JOIN chunks c ON c.id = rm.chunk_id",
2565 )?;
2566
2567 tx.execute_batch("DROP TABLE IF EXISTS messages_fts")?;
2568 tx.execute_batch("DELETE FROM messages_rowid_map")?;
2569 tx.execute_batch(
2570 "CREATE VIRTUAL TABLE messages_fts USING fts5(
2571 content,
2572 content='',
2573 content_rowid='rowid',
2574 tokenize='porter unicode61'
2575 )",
2576 )?;
2577 tx.execute_batch("INSERT INTO messages_rowid_map (message_id) SELECT id FROM messages")?;
2578 tx.execute_batch(
2579 "INSERT INTO messages_fts (rowid, content)
2580 SELECT rm.rowid, m.content
2581 FROM messages_rowid_map rm
2582 JOIN messages m ON m.id = rm.message_id",
2583 )?;
2584
2585 tx.execute_batch("DROP TABLE IF EXISTS episodes_fts")?;
2586 tx.execute_batch("DELETE FROM episodes_rowid_map")?;
2587 tx.execute_batch(
2588 "CREATE VIRTUAL TABLE episodes_fts USING fts5(
2589 content,
2590 content='',
2591 content_rowid='rowid',
2592 tokenize='porter unicode61'
2593 )",
2594 )?;
2595 tx.execute_batch(
2596 "INSERT INTO episodes_rowid_map (episode_id, document_id) SELECT episode_id, document_id FROM episodes",
2597 )?;
2598 tx.execute_batch(
2599 "INSERT INTO episodes_fts (rowid, content)
2600 SELECT rm.rowid, e.search_text
2601 FROM episodes_rowid_map rm
2602 JOIN episodes e ON e.episode_id = rm.episode_id",
2603 )?;
2604
2605 Ok(())
2606 })?;
2607
2608 tracing::info!("FTS indexes reconciled");
2609 Ok(())
2610}
2611
2612fn verify_fts_drift(
2613 conn: &Connection,
2614 label: &str,
2615 map_table: &str,
2616 source_count: usize,
2617 issues: &mut Vec<String>,
2618) {
2619 let table_exists: bool = conn
2620 .query_row(
2621 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name = ?1",
2622 params![map_table],
2623 |row| row.get(0),
2624 )
2625 .unwrap_or(false);
2626 if !table_exists {
2627 if source_count > 0 {
2628 issues.push(format!("{} rows exist but {} is missing", label, map_table));
2629 }
2630 return;
2631 }
2632
2633 let sql = format!("SELECT COUNT(*) FROM {}", map_table);
2634 let indexed_count: usize = conn.query_row(&sql, [], |row| row.get(0)).unwrap_or(0);
2635 if indexed_count != source_count {
2636 issues.push(format!(
2637 "FTS {} index drift: {} rows in map vs {} source rows",
2638 label, indexed_count, source_count
2639 ));
2640 }
2641}
2642
2643fn verify_blob_table(
2644 conn: &Connection,
2645 table: &'static str,
2646 id_column: &'static str,
2647 blob_column: &'static str,
2648 expected_dims: usize,
2649 issues: &mut Vec<String>,
2650) -> Result<(), MemoryError> {
2651 if expected_dims == 0 {
2652 return Ok(());
2653 }
2654
2655 let sql = format!(
2656 "SELECT CAST({id_column} AS TEXT), {blob_column} FROM {table} WHERE {blob_column} IS NOT NULL"
2657 );
2658 let mut stmt = conn.prepare(&sql)?;
2659 let rows = stmt.query_map([], |row| {
2660 Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
2661 })?;
2662
2663 for row in rows {
2664 let (row_id, blob) = row?;
2665 match bytes_to_embedding(&blob) {
2666 Ok(embedding) if embedding.len() != expected_dims => issues.push(format!(
2667 "{}({}) has embedding dimension {} but expected {}",
2668 table,
2669 row_id,
2670 embedding.len(),
2671 expected_dims
2672 )),
2673 Ok(_) => {}
2674 Err(err) => issues.push(format!(
2675 "{}({}) invalid embedding blob: {}",
2676 table, row_id, err
2677 )),
2678 }
2679 }
2680
2681 Ok(())
2682}
2683
2684fn verify_quantized_table(
2685 conn: &Connection,
2686 table: &'static str,
2687 id_column: &'static str,
2688 expected_dims: usize,
2689 issues: &mut Vec<String>,
2690) -> Result<(), MemoryError> {
2691 if expected_dims == 0 {
2692 return Ok(());
2693 }
2694
2695 let sql = format!(
2696 "SELECT CAST({id_column} AS TEXT), embedding_q8 FROM {table} WHERE embedding IS NOT NULL"
2697 );
2698 let mut stmt = conn.prepare(&sql)?;
2699 let rows = stmt.query_map([], |row| {
2700 Ok((row.get::<_, String>(0)?, row.get::<_, Option<Vec<u8>>>(1)?))
2701 })?;
2702
2703 for row in rows {
2704 let (row_id, blob) = row?;
2705 match blob {
2706 Some(blob) => {
2707 if let Err(err) = unpack_quantized(&blob, expected_dims) {
2708 issues.push(format!(
2709 "{}({}) invalid quantized embedding: {}",
2710 table, row_id, err
2711 ));
2712 }
2713 }
2714 None => issues.push(format!("{}({}) missing quantized embedding", table, row_id)),
2715 }
2716 }
2717
2718 Ok(())
2719}
2720
2721fn verify_session_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2722 let mut stmt = conn.prepare("SELECT id, metadata FROM sessions WHERE metadata IS NOT NULL")?;
2723 let rows = stmt.query_map([], |row| {
2724 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2725 })?;
2726 for row in rows {
2727 let (id, metadata) = row?;
2728 if let Err(err) = parse_optional_json("sessions", &id, "metadata", Some(&metadata)) {
2729 issues.push(err.to_string());
2730 }
2731 }
2732 Ok(())
2733}
2734
2735fn verify_message_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2736 let mut stmt = conn.prepare("SELECT id, role, metadata FROM messages")?;
2737 let rows = stmt.query_map([], |row| {
2738 Ok((
2739 row.get::<_, i64>(0)?,
2740 row.get::<_, String>(1)?,
2741 row.get::<_, Option<String>>(2)?,
2742 ))
2743 })?;
2744 for row in rows {
2745 let (id, role, metadata) = row?;
2746 let row_id = id.to_string();
2747 if let Err(err) = parse_role("messages", &row_id, &role) {
2748 issues.push(err.to_string());
2749 }
2750 if let Err(err) = parse_optional_json("messages", &row_id, "metadata", metadata.as_deref())
2751 {
2752 issues.push(err.to_string());
2753 }
2754 }
2755 Ok(())
2756}
2757
2758fn verify_fact_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2759 let mut stmt = conn.prepare("SELECT id, metadata FROM facts WHERE metadata IS NOT NULL")?;
2760 let rows = stmt.query_map([], |row| {
2761 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2762 })?;
2763 for row in rows {
2764 let (id, metadata) = row?;
2765 if let Err(err) = parse_optional_json("facts", &id, "metadata", Some(&metadata)) {
2766 issues.push(err.to_string());
2767 }
2768 }
2769 Ok(())
2770}
2771
2772fn verify_document_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2773 let mut stmt = conn.prepare("SELECT id, metadata FROM documents WHERE metadata IS NOT NULL")?;
2774 let rows = stmt.query_map([], |row| {
2775 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2776 })?;
2777 for row in rows {
2778 let (id, metadata) = row?;
2779 if let Err(err) = parse_optional_json("documents", &id, "metadata", Some(&metadata)) {
2780 issues.push(err.to_string());
2781 }
2782 }
2783 Ok(())
2784}
2785
2786fn verify_episode_rows(conn: &Connection, issues: &mut Vec<String>) -> Result<(), MemoryError> {
2787 let mut stmt = conn.prepare(
2788 "SELECT episode_id, cause_ids, outcome, verification_status
2789 FROM episodes",
2790 )?;
2791 let rows = stmt.query_map([], |row| {
2792 Ok((
2793 row.get::<_, String>(0)?,
2794 row.get::<_, String>(1)?,
2795 row.get::<_, String>(2)?,
2796 row.get::<_, String>(3)?,
2797 ))
2798 })?;
2799 for row in rows {
2800 let (episode_id, cause_ids, outcome, verification_status) = row?;
2801 if let Err(err) = parse_string_list_json("episodes", &episode_id, "cause_ids", &cause_ids) {
2802 issues.push(err.to_string());
2803 }
2804 if let Err(err) = parse_episode_outcome(&episode_id, &outcome) {
2805 issues.push(err.to_string());
2806 }
2807 if let Err(err) = parse_verification_status(&episode_id, &verification_status) {
2808 issues.push(err.to_string());
2809 }
2810 }
2811 Ok(())
2812}