1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6 Migration::new(
7 SchemaVersion(1),
8 "initial canonical schema and runtime tables",
9 r"
10 CREATE TABLE IF NOT EXISTS nodes (
11 row_id TEXT PRIMARY KEY,
12 logical_id TEXT NOT NULL,
13 kind TEXT NOT NULL,
14 properties BLOB NOT NULL,
15 created_at INTEGER NOT NULL,
16 superseded_at INTEGER,
17 source_ref TEXT,
18 confidence REAL
19 );
20
21 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22 ON nodes(logical_id)
23 WHERE superseded_at IS NULL;
24 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25 ON nodes(kind, superseded_at);
26 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27 ON nodes(source_ref);
28
29 CREATE TABLE IF NOT EXISTS edges (
30 row_id TEXT PRIMARY KEY,
31 logical_id TEXT NOT NULL,
32 source_logical_id TEXT NOT NULL,
33 target_logical_id TEXT NOT NULL,
34 kind TEXT NOT NULL,
35 properties BLOB NOT NULL,
36 created_at INTEGER NOT NULL,
37 superseded_at INTEGER,
38 source_ref TEXT,
39 confidence REAL
40 );
41
42 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43 ON edges(logical_id)
44 WHERE superseded_at IS NULL;
45 CREATE INDEX IF NOT EXISTS idx_edges_source_active
46 ON edges(source_logical_id, kind, superseded_at);
47 CREATE INDEX IF NOT EXISTS idx_edges_target_active
48 ON edges(target_logical_id, kind, superseded_at);
49 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50 ON edges(source_ref);
51
52 CREATE TABLE IF NOT EXISTS chunks (
53 id TEXT PRIMARY KEY,
54 node_logical_id TEXT NOT NULL,
55 text_content TEXT NOT NULL,
56 byte_start INTEGER,
57 byte_end INTEGER,
58 created_at INTEGER NOT NULL
59 );
60
61 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62 ON chunks(node_logical_id);
63
64 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65 chunk_id UNINDEXED,
66 node_logical_id UNINDEXED,
67 kind UNINDEXED,
68 text_content
69 );
70
71 CREATE TABLE IF NOT EXISTS vector_profiles (
72 profile TEXT PRIMARY KEY,
73 table_name TEXT NOT NULL,
74 dimension INTEGER NOT NULL,
75 enabled INTEGER NOT NULL DEFAULT 0
76 );
77
78 CREATE TABLE IF NOT EXISTS runs (
79 id TEXT PRIMARY KEY,
80 kind TEXT NOT NULL,
81 status TEXT NOT NULL,
82 properties BLOB NOT NULL,
83 created_at INTEGER NOT NULL,
84 completed_at INTEGER,
85 superseded_at INTEGER,
86 source_ref TEXT
87 );
88
89 CREATE TABLE IF NOT EXISTS steps (
90 id TEXT PRIMARY KEY,
91 run_id TEXT NOT NULL,
92 kind TEXT NOT NULL,
93 status TEXT NOT NULL,
94 properties BLOB NOT NULL,
95 created_at INTEGER NOT NULL,
96 completed_at INTEGER,
97 superseded_at INTEGER,
98 source_ref TEXT,
99 FOREIGN KEY(run_id) REFERENCES runs(id)
100 );
101
102 CREATE TABLE IF NOT EXISTS actions (
103 id TEXT PRIMARY KEY,
104 step_id TEXT NOT NULL,
105 kind TEXT NOT NULL,
106 status TEXT NOT NULL,
107 properties BLOB NOT NULL,
108 created_at INTEGER NOT NULL,
109 completed_at INTEGER,
110 superseded_at INTEGER,
111 source_ref TEXT,
112 FOREIGN KEY(step_id) REFERENCES steps(id)
113 );
114
115 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116 ON runs(source_ref);
117 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118 ON steps(source_ref);
119 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120 ON actions(source_ref);
121 ",
122 ),
123 Migration::new(
124 SchemaVersion(2),
125 "durable audit trail: provenance_events table",
126 r"
127 CREATE TABLE IF NOT EXISTS provenance_events (
128 id TEXT PRIMARY KEY,
129 event_type TEXT NOT NULL,
130 subject TEXT NOT NULL,
131 source_ref TEXT,
132 created_at INTEGER NOT NULL DEFAULT (unixepoch())
133 );
134 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135 ON provenance_events (subject, event_type);
136 ",
137 ),
138 Migration::new(
139 SchemaVersion(3),
140 "vector regeneration contracts",
141 r"
142 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143 profile TEXT PRIMARY KEY,
144 table_name TEXT NOT NULL,
145 model_identity TEXT NOT NULL,
146 model_version TEXT NOT NULL,
147 dimension INTEGER NOT NULL,
148 normalization_policy TEXT NOT NULL,
149 chunking_policy TEXT NOT NULL,
150 preprocessing_policy TEXT NOT NULL,
151 generator_command_json TEXT NOT NULL,
152 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153 );
154 ",
155 ),
156 Migration::new(
157 SchemaVersion(4),
158 "vector regeneration apply metadata",
159 r"
160 ALTER TABLE vector_embedding_contracts
161 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162 ALTER TABLE vector_embedding_contracts
163 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164 UPDATE vector_embedding_contracts
165 SET
166 applied_at = CASE
167 WHEN applied_at = 0 THEN updated_at
168 ELSE applied_at
169 END,
170 snapshot_hash = CASE
171 WHEN snapshot_hash = '' THEN 'legacy'
172 ELSE snapshot_hash
173 END;
174 ",
175 ),
176 Migration::new(
177 SchemaVersion(5),
178 "vector regeneration contract format version",
179 r"
180 ALTER TABLE vector_embedding_contracts
181 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182 UPDATE vector_embedding_contracts
183 SET contract_format_version = 1
184 WHERE contract_format_version = 0;
185 ",
186 ),
187 Migration::new(
188 SchemaVersion(6),
189 "provenance metadata payloads",
190 r"
191 ALTER TABLE provenance_events
192 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193 ",
194 ),
195 Migration::new(
196 SchemaVersion(7),
197 "operational store canonical and derived tables",
198 r"
199 CREATE TABLE IF NOT EXISTS operational_collections (
200 name TEXT PRIMARY KEY,
201 kind TEXT NOT NULL,
202 schema_json TEXT NOT NULL,
203 retention_json TEXT NOT NULL,
204 format_version INTEGER NOT NULL DEFAULT 1,
205 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206 disabled_at INTEGER
207 );
208
209 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210 ON operational_collections(kind, disabled_at);
211
212 CREATE TABLE IF NOT EXISTS operational_mutations (
213 id TEXT PRIMARY KEY,
214 collection_name TEXT NOT NULL,
215 record_key TEXT NOT NULL,
216 op_kind TEXT NOT NULL,
217 payload_json TEXT NOT NULL,
218 source_ref TEXT,
219 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221 );
222
223 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226 ON operational_mutations(source_ref);
227
228 CREATE TABLE IF NOT EXISTS operational_current (
229 collection_name TEXT NOT NULL,
230 record_key TEXT NOT NULL,
231 payload_json TEXT NOT NULL,
232 updated_at INTEGER NOT NULL,
233 last_mutation_id TEXT NOT NULL,
234 PRIMARY KEY(collection_name, record_key),
235 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237 );
238
239 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240 ON operational_current(collection_name, updated_at DESC);
241 ",
242 ),
243 Migration::new(
244 SchemaVersion(8),
245 "operational mutation ordering hardening",
246 r"
247 ALTER TABLE operational_mutations
248 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249 UPDATE operational_mutations
250 SET mutation_order = rowid
251 WHERE mutation_order = 0;
252 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253 ON operational_mutations(collection_name, record_key, mutation_order DESC);
254 ",
255 ),
256 Migration::new(
257 SchemaVersion(9),
258 "node last_accessed metadata",
259 r"
260 CREATE TABLE IF NOT EXISTS node_access_metadata (
261 logical_id TEXT PRIMARY KEY,
262 last_accessed_at INTEGER NOT NULL,
263 updated_at INTEGER NOT NULL
264 );
265
266 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267 ON node_access_metadata(last_accessed_at DESC);
268 ",
269 ),
270 Migration::new(
271 SchemaVersion(10),
272 "operational filtered read contracts and extracted values",
273 r"
274 ALTER TABLE operational_collections
275 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277 CREATE TABLE IF NOT EXISTS operational_filter_values (
278 mutation_id TEXT NOT NULL,
279 collection_name TEXT NOT NULL,
280 field_name TEXT NOT NULL,
281 string_value TEXT,
282 integer_value INTEGER,
283 PRIMARY KEY(mutation_id, field_name),
284 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286 );
287
288 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292 ",
293 ),
294 Migration::new(
295 SchemaVersion(11),
296 "operational payload validation contracts",
297 r"
298 ALTER TABLE operational_collections
299 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300 ",
301 ),
302 Migration::new(
303 SchemaVersion(12),
304 "operational secondary indexes",
305 r"
306 ALTER TABLE operational_collections
307 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310 collection_name TEXT NOT NULL,
311 index_name TEXT NOT NULL,
312 subject_kind TEXT NOT NULL,
313 mutation_id TEXT NOT NULL DEFAULT '',
314 record_key TEXT NOT NULL DEFAULT '',
315 sort_timestamp INTEGER,
316 slot1_text TEXT,
317 slot1_integer INTEGER,
318 slot2_text TEXT,
319 slot2_integer INTEGER,
320 slot3_text TEXT,
321 slot3_integer INTEGER,
322 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325 );
326
327 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328 ON operational_secondary_index_entries(
329 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330 );
331 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332 ON operational_secondary_index_entries(
333 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334 );
335 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336 ON operational_secondary_index_entries(
337 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338 );
339 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340 ON operational_secondary_index_entries(
341 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342 );
343 ",
344 ),
345 Migration::new(
346 SchemaVersion(13),
347 "operational retention run metadata",
348 r"
349 CREATE TABLE IF NOT EXISTS operational_retention_runs (
350 id TEXT PRIMARY KEY,
351 collection_name TEXT NOT NULL,
352 executed_at INTEGER NOT NULL,
353 action_kind TEXT NOT NULL,
354 dry_run INTEGER NOT NULL DEFAULT 0,
355 deleted_mutations INTEGER NOT NULL,
356 rows_remaining INTEGER NOT NULL,
357 metadata_json TEXT NOT NULL DEFAULT '',
358 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359 );
360
361 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362 ON operational_retention_runs(collection_name, executed_at DESC);
363 ",
364 ),
365];
366
367#[derive(Clone, Debug, PartialEq, Eq)]
368pub struct BootstrapReport {
369 pub sqlite_version: String,
370 pub applied_versions: Vec<SchemaVersion>,
371 pub vector_profile_enabled: bool,
372}
373
374#[derive(Clone, Debug, Default)]
375pub struct SchemaManager;
376
377impl SchemaManager {
378 #[must_use]
379 pub fn new() -> Self {
380 Self
381 }
382
383 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
391 self.initialize_connection(conn)?;
392 Self::ensure_metadata_tables(conn)?;
393
394 let max_applied: u32 = conn.query_row(
396 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
397 [],
398 |row| row.get(0),
399 )?;
400 let engine_version = self.current_version().0;
401 trace_info!(
402 current_version = max_applied,
403 engine_version,
404 "schema bootstrap: version check"
405 );
406 if max_applied > engine_version {
407 trace_error!(
408 database_version = max_applied,
409 engine_version,
410 "schema version mismatch: database is newer than engine"
411 );
412 return Err(SchemaError::VersionMismatch {
413 database_version: max_applied,
414 engine_version,
415 });
416 }
417
418 let mut applied_versions = Vec::new();
419 for migration in self.migrations() {
420 let already_applied = conn
421 .query_row(
422 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
423 [i64::from(migration.version.0)],
424 |row| row.get::<_, i64>(0),
425 )
426 .optional()?
427 .is_some();
428
429 if already_applied {
430 continue;
431 }
432
433 let tx = conn.unchecked_transaction()?;
434 match migration.version {
435 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
436 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
437 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
438 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
439 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
440 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
441 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
442 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
443 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
444 _ => tx.execute_batch(migration.sql)?,
445 }
446 tx.execute(
447 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
448 (i64::from(migration.version.0), migration.description),
449 )?;
450 tx.commit()?;
451 trace_info!(
452 version = migration.version.0,
453 description = migration.description,
454 "schema migration applied"
455 );
456 applied_versions.push(migration.version);
457 }
458
459 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
460 let vector_profile_count: i64 = conn.query_row(
461 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
462 [],
463 |row| row.get(0),
464 )?;
465 Ok(BootstrapReport {
466 sqlite_version,
467 applied_versions,
468 vector_profile_enabled: vector_profile_count > 0,
469 })
470 }
471
472 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
473 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
474 let columns = stmt
475 .query_map([], |row| row.get::<_, String>(1))?
476 .collect::<Result<Vec<_>, _>>()?;
477 let has_applied_at = columns.iter().any(|column| column == "applied_at");
478 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
479
480 if !has_applied_at {
481 conn.execute(
482 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
483 [],
484 )?;
485 }
486 if !has_snapshot_hash {
487 conn.execute(
488 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
489 [],
490 )?;
491 }
492 conn.execute(
493 r"
494 UPDATE vector_embedding_contracts
495 SET
496 applied_at = CASE
497 WHEN applied_at = 0 THEN updated_at
498 ELSE applied_at
499 END,
500 snapshot_hash = CASE
501 WHEN snapshot_hash = '' THEN 'legacy'
502 ELSE snapshot_hash
503 END
504 ",
505 [],
506 )?;
507 Ok(())
508 }
509
510 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
511 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
512 let columns = stmt
513 .query_map([], |row| row.get::<_, String>(1))?
514 .collect::<Result<Vec<_>, _>>()?;
515 let has_contract_format_version = columns
516 .iter()
517 .any(|column| column == "contract_format_version");
518
519 if !has_contract_format_version {
520 conn.execute(
521 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
522 [],
523 )?;
524 }
525 conn.execute(
526 r"
527 UPDATE vector_embedding_contracts
528 SET contract_format_version = 1
529 WHERE contract_format_version = 0
530 ",
531 [],
532 )?;
533 Ok(())
534 }
535
536 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
537 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
538 let columns = stmt
539 .query_map([], |row| row.get::<_, String>(1))?
540 .collect::<Result<Vec<_>, _>>()?;
541 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
542
543 if !has_metadata_json {
544 conn.execute(
545 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
546 [],
547 )?;
548 }
549 Ok(())
550 }
551
552 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
553 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
554 let columns = stmt
555 .query_map([], |row| row.get::<_, String>(1))?
556 .collect::<Result<Vec<_>, _>>()?;
557 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
558
559 if !has_mutation_order {
560 conn.execute(
561 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
562 [],
563 )?;
564 }
565 conn.execute(
566 r"
567 UPDATE operational_mutations
568 SET mutation_order = rowid
569 WHERE mutation_order = 0
570 ",
571 [],
572 )?;
573 conn.execute(
574 r"
575 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
576 ON operational_mutations(collection_name, record_key, mutation_order DESC)
577 ",
578 [],
579 )?;
580 Ok(())
581 }
582
583 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
584 conn.execute_batch(
585 r"
586 CREATE TABLE IF NOT EXISTS node_access_metadata (
587 logical_id TEXT PRIMARY KEY,
588 last_accessed_at INTEGER NOT NULL,
589 updated_at INTEGER NOT NULL
590 );
591
592 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
593 ON node_access_metadata(last_accessed_at DESC);
594 ",
595 )?;
596 Ok(())
597 }
598
599 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
600 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
601 let columns = stmt
602 .query_map([], |row| row.get::<_, String>(1))?
603 .collect::<Result<Vec<_>, _>>()?;
604 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
605
606 if !has_filter_fields_json {
607 conn.execute(
608 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
609 [],
610 )?;
611 }
612
613 conn.execute_batch(
614 r"
615 CREATE TABLE IF NOT EXISTS operational_filter_values (
616 mutation_id TEXT NOT NULL,
617 collection_name TEXT NOT NULL,
618 field_name TEXT NOT NULL,
619 string_value TEXT,
620 integer_value INTEGER,
621 PRIMARY KEY(mutation_id, field_name),
622 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
623 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
624 );
625
626 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
627 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
628 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
629 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
630 ",
631 )?;
632 Ok(())
633 }
634
635 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
636 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
637 let columns = stmt
638 .query_map([], |row| row.get::<_, String>(1))?
639 .collect::<Result<Vec<_>, _>>()?;
640 let has_validation_json = columns.iter().any(|column| column == "validation_json");
641
642 if !has_validation_json {
643 conn.execute(
644 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
645 [],
646 )?;
647 }
648
649 Ok(())
650 }
651
652 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
653 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
654 let columns = stmt
655 .query_map([], |row| row.get::<_, String>(1))?
656 .collect::<Result<Vec<_>, _>>()?;
657 let has_secondary_indexes_json = columns
658 .iter()
659 .any(|column| column == "secondary_indexes_json");
660
661 if !has_secondary_indexes_json {
662 conn.execute(
663 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
664 [],
665 )?;
666 }
667
668 conn.execute_batch(
669 r"
670 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
671 collection_name TEXT NOT NULL,
672 index_name TEXT NOT NULL,
673 subject_kind TEXT NOT NULL,
674 mutation_id TEXT NOT NULL DEFAULT '',
675 record_key TEXT NOT NULL DEFAULT '',
676 sort_timestamp INTEGER,
677 slot1_text TEXT,
678 slot1_integer INTEGER,
679 slot2_text TEXT,
680 slot2_integer INTEGER,
681 slot3_text TEXT,
682 slot3_integer INTEGER,
683 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
684 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
685 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
686 );
687
688 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
689 ON operational_secondary_index_entries(
690 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
691 );
692 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
693 ON operational_secondary_index_entries(
694 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
695 );
696 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
697 ON operational_secondary_index_entries(
698 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
699 );
700 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
701 ON operational_secondary_index_entries(
702 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
703 );
704 ",
705 )?;
706
707 Ok(())
708 }
709
710 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
711 conn.execute_batch(
712 r"
713 CREATE TABLE IF NOT EXISTS operational_retention_runs (
714 id TEXT PRIMARY KEY,
715 collection_name TEXT NOT NULL,
716 executed_at INTEGER NOT NULL,
717 action_kind TEXT NOT NULL,
718 dry_run INTEGER NOT NULL DEFAULT 0,
719 deleted_mutations INTEGER NOT NULL,
720 rows_remaining INTEGER NOT NULL,
721 metadata_json TEXT NOT NULL DEFAULT '',
722 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
723 );
724
725 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
726 ON operational_retention_runs(collection_name, executed_at DESC);
727 ",
728 )?;
729 Ok(())
730 }
731
732 #[must_use]
733 pub fn current_version(&self) -> SchemaVersion {
734 self.migrations()
735 .last()
736 .map_or(SchemaVersion(0), |migration| migration.version)
737 }
738
739 #[must_use]
740 pub fn migrations(&self) -> &'static [Migration] {
741 MIGRATIONS
742 }
743
744 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
750 conn.execute_batch(
751 r"
752 PRAGMA foreign_keys = ON;
753 PRAGMA journal_mode = WAL;
754 PRAGMA synchronous = NORMAL;
755 PRAGMA busy_timeout = 5000;
756 PRAGMA temp_store = MEMORY;
757 PRAGMA mmap_size = 3000000000;
758 PRAGMA journal_size_limit = 536870912;
759 ",
760 )?;
761 Ok(())
762 }
763
764 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
775 conn.execute_batch(
776 r"
777 PRAGMA foreign_keys = ON;
778 PRAGMA busy_timeout = 5000;
779 PRAGMA temp_store = MEMORY;
780 PRAGMA mmap_size = 3000000000;
781 ",
782 )?;
783 Ok(())
784 }
785
786 #[cfg(feature = "sqlite-vec")]
798 pub fn ensure_vector_profile(
799 &self,
800 conn: &Connection,
801 profile: &str,
802 table_name: &str,
803 dimension: usize,
804 ) -> Result<(), SchemaError> {
805 conn.execute_batch(&format!(
806 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
807 chunk_id TEXT PRIMARY KEY,\
808 embedding float[{dimension}]\
809 )"
810 ))?;
811 conn.execute(
812 "INSERT OR REPLACE INTO vector_profiles \
813 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
814 rusqlite::params![profile, table_name, dimension as i64],
815 )?;
816 Ok(())
817 }
818
819 #[cfg(not(feature = "sqlite-vec"))]
824 pub fn ensure_vector_profile(
825 &self,
826 _conn: &Connection,
827 _profile: &str,
828 _table_name: &str,
829 _dimension: usize,
830 ) -> Result<(), SchemaError> {
831 Err(SchemaError::MissingCapability("sqlite-vec"))
832 }
833
834 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
840 conn.execute_batch(
841 r"
842 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
843 version INTEGER PRIMARY KEY,
844 description TEXT NOT NULL,
845 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
846 );
847 ",
848 )?;
849 Ok(())
850 }
851}
852
853#[cfg(test)]
854#[allow(clippy::expect_used)]
855mod tests {
856 use rusqlite::Connection;
857
858 use super::SchemaManager;
859
860 #[test]
861 fn bootstrap_applies_initial_schema() {
862 let conn = Connection::open_in_memory().expect("in-memory sqlite");
863 let manager = SchemaManager::new();
864
865 let report = manager.bootstrap(&conn).expect("bootstrap report");
866
867 assert_eq!(
868 report.applied_versions.len(),
869 manager.current_version().0 as usize
870 );
871 assert!(report.sqlite_version.starts_with('3'));
872 let table_count: i64 = conn
873 .query_row(
874 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
875 [],
876 |row| row.get(0),
877 )
878 .expect("nodes table exists");
879 assert_eq!(table_count, 1);
880 }
881
882 #[test]
885 fn vector_profile_not_enabled_without_feature() {
886 let conn = Connection::open_in_memory().expect("in-memory sqlite");
887 let manager = SchemaManager::new();
888 let report = manager.bootstrap(&conn).expect("bootstrap");
889 assert!(
890 !report.vector_profile_enabled,
891 "vector_profile_enabled must be false on a fresh bootstrap"
892 );
893 }
894
895 #[test]
896 fn vector_profile_skipped_when_dimension_absent() {
897 let conn = Connection::open_in_memory().expect("in-memory sqlite");
899 let manager = SchemaManager::new();
900 manager.bootstrap(&conn).expect("bootstrap");
901
902 let count: i64 = conn
903 .query_row(
904 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
905 [],
906 |row| row.get(0),
907 )
908 .expect("count");
909 assert_eq!(
910 count, 0,
911 "no enabled profile without calling ensure_vector_profile"
912 );
913 }
914
915 #[test]
916 fn bootstrap_report_reflects_actual_vector_state() {
917 let conn = Connection::open_in_memory().expect("in-memory sqlite");
919 let manager = SchemaManager::new();
920 let report = manager.bootstrap(&conn).expect("bootstrap");
921
922 let db_count: i64 = conn
923 .query_row(
924 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
925 [],
926 |row| row.get(0),
927 )
928 .expect("count");
929 assert_eq!(
930 report.vector_profile_enabled,
931 db_count > 0,
932 "BootstrapReport.vector_profile_enabled must match actual DB state"
933 );
934 }
935
936 #[test]
937 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
938 let conn = Connection::open_in_memory().expect("in-memory sqlite");
939 conn.execute_batch(
940 r#"
941 CREATE TABLE provenance_events (
942 id TEXT PRIMARY KEY,
943 event_type TEXT NOT NULL,
944 subject TEXT NOT NULL,
945 source_ref TEXT,
946 created_at INTEGER NOT NULL DEFAULT (unixepoch())
947 );
948 CREATE TABLE vector_embedding_contracts (
949 profile TEXT PRIMARY KEY,
950 table_name TEXT NOT NULL,
951 model_identity TEXT NOT NULL,
952 model_version TEXT NOT NULL,
953 dimension INTEGER NOT NULL,
954 normalization_policy TEXT NOT NULL,
955 chunking_policy TEXT NOT NULL,
956 preprocessing_policy TEXT NOT NULL,
957 generator_command_json TEXT NOT NULL,
958 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
959 applied_at INTEGER NOT NULL DEFAULT 0,
960 snapshot_hash TEXT NOT NULL DEFAULT ''
961 );
962 INSERT INTO vector_embedding_contracts (
963 profile,
964 table_name,
965 model_identity,
966 model_version,
967 dimension,
968 normalization_policy,
969 chunking_policy,
970 preprocessing_policy,
971 generator_command_json,
972 updated_at,
973 applied_at,
974 snapshot_hash
975 ) VALUES (
976 'default',
977 'vec_nodes_active',
978 'legacy-model',
979 '0.9.0',
980 4,
981 'l2',
982 'per_chunk',
983 'trim',
984 '["/bin/echo"]',
985 100,
986 100,
987 'legacy'
988 );
989 "#,
990 )
991 .expect("seed legacy schema");
992 let manager = SchemaManager::new();
993
994 let report = manager.bootstrap(&conn).expect("bootstrap");
995
996 assert!(
997 report.applied_versions.iter().any(|version| version.0 >= 5),
998 "bootstrap should apply hardening migrations"
999 );
1000 let format_version: i64 = conn
1001 .query_row(
1002 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1003 [],
1004 |row| row.get(0),
1005 )
1006 .expect("contract_format_version");
1007 assert_eq!(format_version, 1);
1008 let metadata_column_count: i64 = conn
1009 .query_row(
1010 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1011 [],
1012 |row| row.get(0),
1013 )
1014 .expect("metadata_json column count");
1015 assert_eq!(metadata_column_count, 1);
1016 }
1017
1018 #[test]
1019 fn bootstrap_creates_operational_store_tables() {
1020 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1021 let manager = SchemaManager::new();
1022
1023 manager.bootstrap(&conn).expect("bootstrap");
1024
1025 for table in [
1026 "operational_collections",
1027 "operational_mutations",
1028 "operational_current",
1029 ] {
1030 let count: i64 = conn
1031 .query_row(
1032 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1033 [table],
1034 |row| row.get(0),
1035 )
1036 .expect("table existence");
1037 assert_eq!(count, 1, "{table} should exist after bootstrap");
1038 }
1039 let mutation_order_columns: i64 = conn
1040 .query_row(
1041 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1042 [],
1043 |row| row.get(0),
1044 )
1045 .expect("mutation_order column exists");
1046 assert_eq!(mutation_order_columns, 1);
1047 }
1048
1049 #[test]
1050 fn bootstrap_is_idempotent_with_operational_store_tables() {
1051 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1052 let manager = SchemaManager::new();
1053
1054 manager.bootstrap(&conn).expect("first bootstrap");
1055 let report = manager.bootstrap(&conn).expect("second bootstrap");
1056
1057 assert!(
1058 report.applied_versions.is_empty(),
1059 "second bootstrap should apply no new migrations"
1060 );
1061 let count: i64 = conn
1062 .query_row(
1063 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1064 [],
1065 |row| row.get(0),
1066 )
1067 .expect("operational_collections table exists");
1068 assert_eq!(count, 1);
1069 }
1070
1071 #[test]
1072 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1073 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1074 conn.execute_batch(
1075 r#"
1076 CREATE TABLE operational_collections (
1077 name TEXT PRIMARY KEY,
1078 kind TEXT NOT NULL,
1079 schema_json TEXT NOT NULL,
1080 retention_json TEXT NOT NULL,
1081 format_version INTEGER NOT NULL DEFAULT 1,
1082 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1083 disabled_at INTEGER
1084 );
1085
1086 CREATE TABLE operational_mutations (
1087 id TEXT PRIMARY KEY,
1088 collection_name TEXT NOT NULL,
1089 record_key TEXT NOT NULL,
1090 op_kind TEXT NOT NULL,
1091 payload_json TEXT NOT NULL,
1092 source_ref TEXT,
1093 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1094 mutation_order INTEGER NOT NULL DEFAULT 0,
1095 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1096 );
1097
1098 CREATE TABLE operational_current (
1099 collection_name TEXT NOT NULL,
1100 record_key TEXT NOT NULL,
1101 payload_json TEXT NOT NULL,
1102 updated_at INTEGER NOT NULL,
1103 last_mutation_id TEXT NOT NULL,
1104 PRIMARY KEY(collection_name, record_key),
1105 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1106 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1107 );
1108
1109 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1110 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1111 INSERT INTO operational_mutations (
1112 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1113 ) VALUES (
1114 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1115 );
1116 "#,
1117 )
1118 .expect("seed recovered operational tables");
1119
1120 let manager = SchemaManager::new();
1121 let report = manager
1122 .bootstrap(&conn)
1123 .expect("bootstrap recovered schema");
1124
1125 assert!(
1126 report.applied_versions.iter().any(|version| version.0 == 8),
1127 "bootstrap should record operational mutation ordering hardening"
1128 );
1129 let mutation_order: i64 = conn
1130 .query_row(
1131 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1132 [],
1133 |row| row.get(0),
1134 )
1135 .expect("mutation_order");
1136 assert_ne!(
1137 mutation_order, 0,
1138 "bootstrap should backfill recovered operational rows"
1139 );
1140 let count: i64 = conn
1141 .query_row(
1142 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1143 [],
1144 |row| row.get(0),
1145 )
1146 .expect("ordering index exists");
1147 assert_eq!(count, 1);
1148 }
1149
1150 #[test]
1151 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1152 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1153 conn.execute_batch(
1154 r#"
1155 CREATE TABLE operational_collections (
1156 name TEXT PRIMARY KEY,
1157 kind TEXT NOT NULL,
1158 schema_json TEXT NOT NULL,
1159 retention_json TEXT NOT NULL,
1160 format_version INTEGER NOT NULL DEFAULT 1,
1161 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1162 disabled_at INTEGER
1163 );
1164
1165 CREATE TABLE operational_mutations (
1166 id TEXT PRIMARY KEY,
1167 collection_name TEXT NOT NULL,
1168 record_key TEXT NOT NULL,
1169 op_kind TEXT NOT NULL,
1170 payload_json TEXT NOT NULL,
1171 source_ref TEXT,
1172 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1173 mutation_order INTEGER NOT NULL DEFAULT 1,
1174 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1175 );
1176
1177 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1178 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1179 "#,
1180 )
1181 .expect("seed recovered operational schema");
1182
1183 let manager = SchemaManager::new();
1184 let report = manager
1185 .bootstrap(&conn)
1186 .expect("bootstrap recovered schema");
1187
1188 assert!(
1189 report
1190 .applied_versions
1191 .iter()
1192 .any(|version| version.0 == 10),
1193 "bootstrap should record operational filtered read migration"
1194 );
1195 assert!(
1196 report
1197 .applied_versions
1198 .iter()
1199 .any(|version| version.0 == 11),
1200 "bootstrap should record operational validation migration"
1201 );
1202 let filter_fields_json: String = conn
1203 .query_row(
1204 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1205 [],
1206 |row| row.get(0),
1207 )
1208 .expect("filter_fields_json added");
1209 assert_eq!(filter_fields_json, "[]");
1210 let validation_json: String = conn
1211 .query_row(
1212 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1213 [],
1214 |row| row.get(0),
1215 )
1216 .expect("validation_json added");
1217 assert_eq!(validation_json, "");
1218 let table_count: i64 = conn
1219 .query_row(
1220 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1221 [],
1222 |row| row.get(0),
1223 )
1224 .expect("filter table exists");
1225 assert_eq!(table_count, 1);
1226 }
1227
1228 #[test]
1229 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1230 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1231 let manager = SchemaManager::new();
1232 manager.bootstrap(&conn).expect("initial bootstrap");
1233
1234 conn.execute("DROP TABLE fathom_schema_migrations", [])
1235 .expect("drop migration history");
1236 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1237
1238 let report = manager
1239 .bootstrap(&conn)
1240 .expect("rebootstrap existing schema");
1241
1242 assert!(
1243 report
1244 .applied_versions
1245 .iter()
1246 .any(|version| version.0 == 10),
1247 "rebootstrap should re-record migration 10"
1248 );
1249 assert!(
1250 report
1251 .applied_versions
1252 .iter()
1253 .any(|version| version.0 == 11),
1254 "rebootstrap should re-record migration 11"
1255 );
1256 let filter_fields_json: String = conn
1257 .query_row(
1258 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1259 [],
1260 |row| row.get(0),
1261 )
1262 .unwrap_or_else(|_| "[]".to_string());
1263 assert_eq!(filter_fields_json, "[]");
1264 let validation_json: String = conn
1265 .query_row(
1266 "SELECT validation_json FROM operational_collections LIMIT 1",
1267 [],
1268 |row| row.get(0),
1269 )
1270 .unwrap_or_default();
1271 assert_eq!(validation_json, "");
1272 }
1273
1274 #[test]
1275 fn downgrade_detected_returns_version_mismatch() {
1276 use crate::SchemaError;
1277
1278 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1279 let manager = SchemaManager::new();
1280 manager.bootstrap(&conn).expect("initial bootstrap");
1281
1282 conn.execute(
1283 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1284 (999_i64, "future migration"),
1285 )
1286 .expect("insert future version");
1287
1288 let err = manager
1289 .bootstrap(&conn)
1290 .expect_err("should fail on downgrade");
1291 assert!(
1292 matches!(
1293 err,
1294 SchemaError::VersionMismatch {
1295 database_version: 999,
1296 ..
1297 }
1298 ),
1299 "expected VersionMismatch with database_version 999, got: {err}"
1300 );
1301 }
1302
1303 #[test]
1304 fn journal_size_limit_is_set() {
1305 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1306 let manager = SchemaManager::new();
1307 manager
1308 .initialize_connection(&conn)
1309 .expect("initialize_connection");
1310
1311 let limit: i64 = conn
1312 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1313 .expect("journal_size_limit pragma");
1314 assert_eq!(limit, 536_870_912);
1315 }
1316
1317 #[cfg(feature = "sqlite-vec")]
1318 #[test]
1319 fn vector_profile_created_when_feature_enabled() {
1320 unsafe {
1323 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1324 sqlite_vec::sqlite3_vec_init as *const (),
1325 )));
1326 }
1327 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1328 let manager = SchemaManager::new();
1329 manager.bootstrap(&conn).expect("bootstrap");
1330
1331 manager
1332 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1333 .expect("ensure_vector_profile");
1334
1335 let count: i64 = conn
1336 .query_row(
1337 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1338 [],
1339 |row| row.get(0),
1340 )
1341 .expect("count");
1342 assert_eq!(
1343 count, 1,
1344 "vector profile must be enabled after ensure_vector_profile"
1345 );
1346
1347 let _: i64 = conn
1349 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1350 row.get(0)
1351 })
1352 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1353 }
1354}