1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6 Migration::new(
7 SchemaVersion(1),
8 "initial canonical schema and runtime tables",
9 r"
10 CREATE TABLE IF NOT EXISTS nodes (
11 row_id TEXT PRIMARY KEY,
12 logical_id TEXT NOT NULL,
13 kind TEXT NOT NULL,
14 properties BLOB NOT NULL,
15 created_at INTEGER NOT NULL,
16 superseded_at INTEGER,
17 source_ref TEXT,
18 confidence REAL
19 );
20
21 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22 ON nodes(logical_id)
23 WHERE superseded_at IS NULL;
24 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25 ON nodes(kind, superseded_at);
26 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27 ON nodes(source_ref);
28
29 CREATE TABLE IF NOT EXISTS edges (
30 row_id TEXT PRIMARY KEY,
31 logical_id TEXT NOT NULL,
32 source_logical_id TEXT NOT NULL,
33 target_logical_id TEXT NOT NULL,
34 kind TEXT NOT NULL,
35 properties BLOB NOT NULL,
36 created_at INTEGER NOT NULL,
37 superseded_at INTEGER,
38 source_ref TEXT,
39 confidence REAL
40 );
41
42 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43 ON edges(logical_id)
44 WHERE superseded_at IS NULL;
45 CREATE INDEX IF NOT EXISTS idx_edges_source_active
46 ON edges(source_logical_id, kind, superseded_at);
47 CREATE INDEX IF NOT EXISTS idx_edges_target_active
48 ON edges(target_logical_id, kind, superseded_at);
49 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50 ON edges(source_ref);
51
52 CREATE TABLE IF NOT EXISTS chunks (
53 id TEXT PRIMARY KEY,
54 node_logical_id TEXT NOT NULL,
55 text_content TEXT NOT NULL,
56 byte_start INTEGER,
57 byte_end INTEGER,
58 created_at INTEGER NOT NULL
59 );
60
61 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62 ON chunks(node_logical_id);
63
64 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65 chunk_id UNINDEXED,
66 node_logical_id UNINDEXED,
67 kind UNINDEXED,
68 text_content
69 );
70
71 CREATE TABLE IF NOT EXISTS vector_profiles (
72 profile TEXT PRIMARY KEY,
73 table_name TEXT NOT NULL,
74 dimension INTEGER NOT NULL,
75 enabled INTEGER NOT NULL DEFAULT 0
76 );
77
78 CREATE TABLE IF NOT EXISTS runs (
79 id TEXT PRIMARY KEY,
80 kind TEXT NOT NULL,
81 status TEXT NOT NULL,
82 properties BLOB NOT NULL,
83 created_at INTEGER NOT NULL,
84 completed_at INTEGER,
85 superseded_at INTEGER,
86 source_ref TEXT
87 );
88
89 CREATE TABLE IF NOT EXISTS steps (
90 id TEXT PRIMARY KEY,
91 run_id TEXT NOT NULL,
92 kind TEXT NOT NULL,
93 status TEXT NOT NULL,
94 properties BLOB NOT NULL,
95 created_at INTEGER NOT NULL,
96 completed_at INTEGER,
97 superseded_at INTEGER,
98 source_ref TEXT,
99 FOREIGN KEY(run_id) REFERENCES runs(id)
100 );
101
102 CREATE TABLE IF NOT EXISTS actions (
103 id TEXT PRIMARY KEY,
104 step_id TEXT NOT NULL,
105 kind TEXT NOT NULL,
106 status TEXT NOT NULL,
107 properties BLOB NOT NULL,
108 created_at INTEGER NOT NULL,
109 completed_at INTEGER,
110 superseded_at INTEGER,
111 source_ref TEXT,
112 FOREIGN KEY(step_id) REFERENCES steps(id)
113 );
114
115 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116 ON runs(source_ref);
117 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118 ON steps(source_ref);
119 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120 ON actions(source_ref);
121 ",
122 ),
123 Migration::new(
124 SchemaVersion(2),
125 "durable audit trail: provenance_events table",
126 r"
127 CREATE TABLE IF NOT EXISTS provenance_events (
128 id TEXT PRIMARY KEY,
129 event_type TEXT NOT NULL,
130 subject TEXT NOT NULL,
131 source_ref TEXT,
132 created_at INTEGER NOT NULL DEFAULT (unixepoch())
133 );
134 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135 ON provenance_events (subject, event_type);
136 ",
137 ),
138 Migration::new(
139 SchemaVersion(3),
140 "vector regeneration contracts",
141 r"
142 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143 profile TEXT PRIMARY KEY,
144 table_name TEXT NOT NULL,
145 model_identity TEXT NOT NULL,
146 model_version TEXT NOT NULL,
147 dimension INTEGER NOT NULL,
148 normalization_policy TEXT NOT NULL,
149 chunking_policy TEXT NOT NULL,
150 preprocessing_policy TEXT NOT NULL,
151 generator_command_json TEXT NOT NULL,
152 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153 );
154 ",
155 ),
156 Migration::new(
157 SchemaVersion(4),
158 "vector regeneration apply metadata",
159 r"
160 ALTER TABLE vector_embedding_contracts
161 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162 ALTER TABLE vector_embedding_contracts
163 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164 UPDATE vector_embedding_contracts
165 SET
166 applied_at = CASE
167 WHEN applied_at = 0 THEN updated_at
168 ELSE applied_at
169 END,
170 snapshot_hash = CASE
171 WHEN snapshot_hash = '' THEN 'legacy'
172 ELSE snapshot_hash
173 END;
174 ",
175 ),
176 Migration::new(
177 SchemaVersion(5),
178 "vector regeneration contract format version",
179 r"
180 ALTER TABLE vector_embedding_contracts
181 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182 UPDATE vector_embedding_contracts
183 SET contract_format_version = 1
184 WHERE contract_format_version = 0;
185 ",
186 ),
187 Migration::new(
188 SchemaVersion(6),
189 "provenance metadata payloads",
190 r"
191 ALTER TABLE provenance_events
192 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193 ",
194 ),
195 Migration::new(
196 SchemaVersion(7),
197 "operational store canonical and derived tables",
198 r"
199 CREATE TABLE IF NOT EXISTS operational_collections (
200 name TEXT PRIMARY KEY,
201 kind TEXT NOT NULL,
202 schema_json TEXT NOT NULL,
203 retention_json TEXT NOT NULL,
204 format_version INTEGER NOT NULL DEFAULT 1,
205 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206 disabled_at INTEGER
207 );
208
209 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210 ON operational_collections(kind, disabled_at);
211
212 CREATE TABLE IF NOT EXISTS operational_mutations (
213 id TEXT PRIMARY KEY,
214 collection_name TEXT NOT NULL,
215 record_key TEXT NOT NULL,
216 op_kind TEXT NOT NULL,
217 payload_json TEXT NOT NULL,
218 source_ref TEXT,
219 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221 );
222
223 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226 ON operational_mutations(source_ref);
227
228 CREATE TABLE IF NOT EXISTS operational_current (
229 collection_name TEXT NOT NULL,
230 record_key TEXT NOT NULL,
231 payload_json TEXT NOT NULL,
232 updated_at INTEGER NOT NULL,
233 last_mutation_id TEXT NOT NULL,
234 PRIMARY KEY(collection_name, record_key),
235 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237 );
238
239 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240 ON operational_current(collection_name, updated_at DESC);
241 ",
242 ),
243 Migration::new(
244 SchemaVersion(8),
245 "operational mutation ordering hardening",
246 r"
247 ALTER TABLE operational_mutations
248 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249 UPDATE operational_mutations
250 SET mutation_order = rowid
251 WHERE mutation_order = 0;
252 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253 ON operational_mutations(collection_name, record_key, mutation_order DESC);
254 ",
255 ),
256 Migration::new(
257 SchemaVersion(9),
258 "node last_accessed metadata",
259 r"
260 CREATE TABLE IF NOT EXISTS node_access_metadata (
261 logical_id TEXT PRIMARY KEY,
262 last_accessed_at INTEGER NOT NULL,
263 updated_at INTEGER NOT NULL
264 );
265
266 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267 ON node_access_metadata(last_accessed_at DESC);
268 ",
269 ),
270 Migration::new(
271 SchemaVersion(10),
272 "operational filtered read contracts and extracted values",
273 r"
274 ALTER TABLE operational_collections
275 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277 CREATE TABLE IF NOT EXISTS operational_filter_values (
278 mutation_id TEXT NOT NULL,
279 collection_name TEXT NOT NULL,
280 field_name TEXT NOT NULL,
281 string_value TEXT,
282 integer_value INTEGER,
283 PRIMARY KEY(mutation_id, field_name),
284 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286 );
287
288 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292 ",
293 ),
294 Migration::new(
295 SchemaVersion(11),
296 "operational payload validation contracts",
297 r"
298 ALTER TABLE operational_collections
299 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300 ",
301 ),
302 Migration::new(
303 SchemaVersion(12),
304 "operational secondary indexes",
305 r"
306 ALTER TABLE operational_collections
307 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310 collection_name TEXT NOT NULL,
311 index_name TEXT NOT NULL,
312 subject_kind TEXT NOT NULL,
313 mutation_id TEXT NOT NULL DEFAULT '',
314 record_key TEXT NOT NULL DEFAULT '',
315 sort_timestamp INTEGER,
316 slot1_text TEXT,
317 slot1_integer INTEGER,
318 slot2_text TEXT,
319 slot2_integer INTEGER,
320 slot3_text TEXT,
321 slot3_integer INTEGER,
322 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325 );
326
327 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328 ON operational_secondary_index_entries(
329 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330 );
331 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332 ON operational_secondary_index_entries(
333 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334 );
335 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336 ON operational_secondary_index_entries(
337 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338 );
339 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340 ON operational_secondary_index_entries(
341 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342 );
343 ",
344 ),
345 Migration::new(
346 SchemaVersion(13),
347 "operational retention run metadata",
348 r"
349 CREATE TABLE IF NOT EXISTS operational_retention_runs (
350 id TEXT PRIMARY KEY,
351 collection_name TEXT NOT NULL,
352 executed_at INTEGER NOT NULL,
353 action_kind TEXT NOT NULL,
354 dry_run INTEGER NOT NULL DEFAULT 0,
355 deleted_mutations INTEGER NOT NULL,
356 rows_remaining INTEGER NOT NULL,
357 metadata_json TEXT NOT NULL DEFAULT '',
358 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359 );
360
361 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362 ON operational_retention_runs(collection_name, executed_at DESC);
363 ",
364 ),
365 Migration::new(
366 SchemaVersion(14),
367 "external content object columns",
368 r"
369 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372 ON nodes(content_ref)
373 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376 ",
377 ),
378 Migration::new(
379 SchemaVersion(15),
380 "FTS property projection schemas",
381 r"
382 CREATE TABLE IF NOT EXISTS fts_property_schemas (
383 kind TEXT PRIMARY KEY,
384 property_paths_json TEXT NOT NULL,
385 separator TEXT NOT NULL DEFAULT ' ',
386 format_version INTEGER NOT NULL DEFAULT 1,
387 created_at INTEGER NOT NULL DEFAULT (unixepoch())
388 );
389
390 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391 node_logical_id UNINDEXED,
392 kind UNINDEXED,
393 text_content
394 );
395 ",
396 ),
397 Migration::new(
398 SchemaVersion(16),
399 "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
400 "",
406 ),
407 Migration::new(
408 SchemaVersion(17),
409 "fts property position-map sidecar for recursive extraction",
410 r"
425 CREATE TABLE IF NOT EXISTS fts_node_property_positions (
426 node_logical_id TEXT NOT NULL,
427 kind TEXT NOT NULL,
428 start_offset INTEGER NOT NULL,
429 end_offset INTEGER NOT NULL,
430 leaf_path TEXT NOT NULL
431 );
432
433 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
434 ON fts_node_property_positions(node_logical_id, kind);
435
436 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
437 ON fts_node_property_positions(kind);
438 ",
439 ),
440 Migration::new(
441 SchemaVersion(18),
442 "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
443 r"
456 DROP TABLE IF EXISTS fts_node_property_positions;
457
458 CREATE TABLE fts_node_property_positions (
459 node_logical_id TEXT NOT NULL,
460 kind TEXT NOT NULL,
461 start_offset INTEGER NOT NULL,
462 end_offset INTEGER NOT NULL,
463 leaf_path TEXT NOT NULL,
464 UNIQUE(node_logical_id, kind, start_offset)
465 );
466
467 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
468 ON fts_node_property_positions(node_logical_id, kind);
469
470 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
471 ON fts_node_property_positions(kind);
472 ",
473 ),
474 Migration::new(
475 SchemaVersion(19),
476 "async property-FTS rebuild staging and state tables",
477 r"
478 CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
479 kind TEXT NOT NULL,
480 node_logical_id TEXT NOT NULL,
481 text_content TEXT NOT NULL,
482 positions_blob BLOB,
483 PRIMARY KEY (kind, node_logical_id)
484 );
485
486 CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
487 kind TEXT PRIMARY KEY,
488 schema_id INTEGER NOT NULL,
489 state TEXT NOT NULL,
490 rows_total INTEGER,
491 rows_done INTEGER NOT NULL DEFAULT 0,
492 started_at INTEGER NOT NULL,
493 last_progress_at INTEGER,
494 error_message TEXT,
495 is_first_registration INTEGER NOT NULL DEFAULT 0
496 );
497 ",
498 ),
499];
500
501#[derive(Clone, Debug, PartialEq, Eq)]
502pub struct BootstrapReport {
503 pub sqlite_version: String,
504 pub applied_versions: Vec<SchemaVersion>,
505 pub vector_profile_enabled: bool,
506}
507
508#[derive(Clone, Debug, Default)]
509pub struct SchemaManager;
510
511impl SchemaManager {
512 #[must_use]
513 pub fn new() -> Self {
514 Self
515 }
516
517 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
525 self.initialize_connection(conn)?;
526 Self::ensure_metadata_tables(conn)?;
527
528 let max_applied: u32 = conn.query_row(
530 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
531 [],
532 |row| row.get(0),
533 )?;
534 let engine_version = self.current_version().0;
535 trace_info!(
536 current_version = max_applied,
537 engine_version,
538 "schema bootstrap: version check"
539 );
540 if max_applied > engine_version {
541 trace_error!(
542 database_version = max_applied,
543 engine_version,
544 "schema version mismatch: database is newer than engine"
545 );
546 return Err(SchemaError::VersionMismatch {
547 database_version: max_applied,
548 engine_version,
549 });
550 }
551
552 let mut applied_versions = Vec::new();
553 for migration in self.migrations() {
554 let already_applied = conn
555 .query_row(
556 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
557 [i64::from(migration.version.0)],
558 |row| row.get::<_, i64>(0),
559 )
560 .optional()?
561 .is_some();
562
563 if already_applied {
564 continue;
565 }
566
567 let tx = conn.unchecked_transaction()?;
568 match migration.version {
569 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
570 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
571 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
572 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
573 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
574 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
575 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
576 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
577 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
578 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
579 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
580 SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
581 _ => tx.execute_batch(migration.sql)?,
582 }
583 tx.execute(
584 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
585 (i64::from(migration.version.0), migration.description),
586 )?;
587 tx.commit()?;
588 trace_info!(
589 version = migration.version.0,
590 description = migration.description,
591 "schema migration applied"
592 );
593 applied_versions.push(migration.version);
594 }
595
596 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
597 let vector_profile_count: i64 = conn.query_row(
598 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
599 [],
600 |row| row.get(0),
601 )?;
602 Ok(BootstrapReport {
603 sqlite_version,
604 applied_versions,
605 vector_profile_enabled: vector_profile_count > 0,
606 })
607 }
608
609 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
610 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
611 let columns = stmt
612 .query_map([], |row| row.get::<_, String>(1))?
613 .collect::<Result<Vec<_>, _>>()?;
614 let has_applied_at = columns.iter().any(|column| column == "applied_at");
615 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
616
617 if !has_applied_at {
618 conn.execute(
619 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
620 [],
621 )?;
622 }
623 if !has_snapshot_hash {
624 conn.execute(
625 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
626 [],
627 )?;
628 }
629 conn.execute(
630 r"
631 UPDATE vector_embedding_contracts
632 SET
633 applied_at = CASE
634 WHEN applied_at = 0 THEN updated_at
635 ELSE applied_at
636 END,
637 snapshot_hash = CASE
638 WHEN snapshot_hash = '' THEN 'legacy'
639 ELSE snapshot_hash
640 END
641 ",
642 [],
643 )?;
644 Ok(())
645 }
646
647 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
648 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
649 let columns = stmt
650 .query_map([], |row| row.get::<_, String>(1))?
651 .collect::<Result<Vec<_>, _>>()?;
652 let has_contract_format_version = columns
653 .iter()
654 .any(|column| column == "contract_format_version");
655
656 if !has_contract_format_version {
657 conn.execute(
658 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
659 [],
660 )?;
661 }
662 conn.execute(
663 r"
664 UPDATE vector_embedding_contracts
665 SET contract_format_version = 1
666 WHERE contract_format_version = 0
667 ",
668 [],
669 )?;
670 Ok(())
671 }
672
673 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
674 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
675 let columns = stmt
676 .query_map([], |row| row.get::<_, String>(1))?
677 .collect::<Result<Vec<_>, _>>()?;
678 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
679
680 if !has_metadata_json {
681 conn.execute(
682 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
683 [],
684 )?;
685 }
686 Ok(())
687 }
688
689 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
690 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
691 let columns = stmt
692 .query_map([], |row| row.get::<_, String>(1))?
693 .collect::<Result<Vec<_>, _>>()?;
694 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
695
696 if !has_mutation_order {
697 conn.execute(
698 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
699 [],
700 )?;
701 }
702 conn.execute(
703 r"
704 UPDATE operational_mutations
705 SET mutation_order = rowid
706 WHERE mutation_order = 0
707 ",
708 [],
709 )?;
710 conn.execute(
711 r"
712 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
713 ON operational_mutations(collection_name, record_key, mutation_order DESC)
714 ",
715 [],
716 )?;
717 Ok(())
718 }
719
720 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
721 conn.execute_batch(
722 r"
723 CREATE TABLE IF NOT EXISTS node_access_metadata (
724 logical_id TEXT PRIMARY KEY,
725 last_accessed_at INTEGER NOT NULL,
726 updated_at INTEGER NOT NULL
727 );
728
729 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
730 ON node_access_metadata(last_accessed_at DESC);
731 ",
732 )?;
733 Ok(())
734 }
735
736 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
737 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
738 let columns = stmt
739 .query_map([], |row| row.get::<_, String>(1))?
740 .collect::<Result<Vec<_>, _>>()?;
741 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
742
743 if !has_filter_fields_json {
744 conn.execute(
745 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
746 [],
747 )?;
748 }
749
750 conn.execute_batch(
751 r"
752 CREATE TABLE IF NOT EXISTS operational_filter_values (
753 mutation_id TEXT NOT NULL,
754 collection_name TEXT NOT NULL,
755 field_name TEXT NOT NULL,
756 string_value TEXT,
757 integer_value INTEGER,
758 PRIMARY KEY(mutation_id, field_name),
759 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
760 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
761 );
762
763 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
764 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
765 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
766 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
767 ",
768 )?;
769 Ok(())
770 }
771
772 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
773 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
774 let columns = stmt
775 .query_map([], |row| row.get::<_, String>(1))?
776 .collect::<Result<Vec<_>, _>>()?;
777 let has_validation_json = columns.iter().any(|column| column == "validation_json");
778
779 if !has_validation_json {
780 conn.execute(
781 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
782 [],
783 )?;
784 }
785
786 Ok(())
787 }
788
789 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
790 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
791 let columns = stmt
792 .query_map([], |row| row.get::<_, String>(1))?
793 .collect::<Result<Vec<_>, _>>()?;
794 let has_secondary_indexes_json = columns
795 .iter()
796 .any(|column| column == "secondary_indexes_json");
797
798 if !has_secondary_indexes_json {
799 conn.execute(
800 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
801 [],
802 )?;
803 }
804
805 conn.execute_batch(
806 r"
807 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
808 collection_name TEXT NOT NULL,
809 index_name TEXT NOT NULL,
810 subject_kind TEXT NOT NULL,
811 mutation_id TEXT NOT NULL DEFAULT '',
812 record_key TEXT NOT NULL DEFAULT '',
813 sort_timestamp INTEGER,
814 slot1_text TEXT,
815 slot1_integer INTEGER,
816 slot2_text TEXT,
817 slot2_integer INTEGER,
818 slot3_text TEXT,
819 slot3_integer INTEGER,
820 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
821 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
822 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
823 );
824
825 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
826 ON operational_secondary_index_entries(
827 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
828 );
829 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
830 ON operational_secondary_index_entries(
831 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
832 );
833 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
834 ON operational_secondary_index_entries(
835 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
836 );
837 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
838 ON operational_secondary_index_entries(
839 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
840 );
841 ",
842 )?;
843
844 Ok(())
845 }
846
847 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
848 conn.execute_batch(
849 r"
850 CREATE TABLE IF NOT EXISTS operational_retention_runs (
851 id TEXT PRIMARY KEY,
852 collection_name TEXT NOT NULL,
853 executed_at INTEGER NOT NULL,
854 action_kind TEXT NOT NULL,
855 dry_run INTEGER NOT NULL DEFAULT 0,
856 deleted_mutations INTEGER NOT NULL,
857 rows_remaining INTEGER NOT NULL,
858 metadata_json TEXT NOT NULL DEFAULT '',
859 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
860 );
861
862 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
863 ON operational_retention_runs(collection_name, executed_at DESC);
864 ",
865 )?;
866 Ok(())
867 }
868
869 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
870 let node_columns = Self::column_names(conn, "nodes")?;
871 if !node_columns.iter().any(|c| c == "content_ref") {
872 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
873 }
874 conn.execute_batch(
875 r"
876 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
877 ON nodes(content_ref)
878 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
879 ",
880 )?;
881
882 let chunk_columns = Self::column_names(conn, "chunks")?;
883 if !chunk_columns.iter().any(|c| c == "content_hash") {
884 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
885 }
886 Ok(())
887 }
888
889 fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
907 conn.execute_batch(
908 r"
909 DROP TABLE IF EXISTS fts_nodes;
910 CREATE VIRTUAL TABLE fts_nodes USING fts5(
911 chunk_id UNINDEXED,
912 node_logical_id UNINDEXED,
913 kind UNINDEXED,
914 text_content,
915 tokenize = 'porter unicode61 remove_diacritics 2'
916 );
917
918 DROP TABLE IF EXISTS fts_node_properties;
919 CREATE VIRTUAL TABLE fts_node_properties USING fts5(
920 node_logical_id UNINDEXED,
921 kind UNINDEXED,
922 text_content,
923 tokenize = 'porter unicode61 remove_diacritics 2'
924 );
925
926 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
927 SELECT c.id, n.logical_id, n.kind, c.text_content
928 FROM chunks c
929 JOIN nodes n
930 ON n.logical_id = c.node_logical_id
931 AND n.superseded_at IS NULL;
932 ",
933 )?;
934 Ok(())
935 }
936
937 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
938 conn.execute_batch(
939 r"
940 CREATE TABLE IF NOT EXISTS fts_property_schemas (
941 kind TEXT PRIMARY KEY,
942 property_paths_json TEXT NOT NULL,
943 separator TEXT NOT NULL DEFAULT ' ',
944 format_version INTEGER NOT NULL DEFAULT 1,
945 created_at INTEGER NOT NULL DEFAULT (unixepoch())
946 );
947
948 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
949 node_logical_id UNINDEXED,
950 kind UNINDEXED,
951 text_content
952 );
953 ",
954 )?;
955 Ok(())
956 }
957
958 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
959 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
960 let names = stmt
961 .query_map([], |row| row.get::<_, String>(1))?
962 .collect::<Result<Vec<_>, _>>()?;
963 Ok(names)
964 }
965
966 #[must_use]
967 pub fn current_version(&self) -> SchemaVersion {
968 self.migrations()
969 .last()
970 .map_or(SchemaVersion(0), |migration| migration.version)
971 }
972
973 #[must_use]
974 pub fn migrations(&self) -> &'static [Migration] {
975 MIGRATIONS
976 }
977
978 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
984 conn.execute_batch(
985 r"
986 PRAGMA foreign_keys = ON;
987 PRAGMA journal_mode = WAL;
988 PRAGMA synchronous = NORMAL;
989 PRAGMA busy_timeout = 5000;
990 PRAGMA temp_store = MEMORY;
991 PRAGMA mmap_size = 3000000000;
992 PRAGMA journal_size_limit = 536870912;
993 ",
994 )?;
995 Ok(())
996 }
997
998 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1009 conn.execute_batch(
1010 r"
1011 PRAGMA foreign_keys = ON;
1012 PRAGMA busy_timeout = 5000;
1013 PRAGMA temp_store = MEMORY;
1014 PRAGMA mmap_size = 3000000000;
1015 ",
1016 )?;
1017 Ok(())
1018 }
1019
1020 #[cfg(feature = "sqlite-vec")]
1032 pub fn ensure_vector_profile(
1033 &self,
1034 conn: &Connection,
1035 profile: &str,
1036 table_name: &str,
1037 dimension: usize,
1038 ) -> Result<(), SchemaError> {
1039 conn.execute_batch(&format!(
1040 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1041 chunk_id TEXT PRIMARY KEY,\
1042 embedding float[{dimension}]\
1043 )"
1044 ))?;
1045 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1048 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1049 format!("vector dimension {dimension} does not fit in i64").into(),
1050 ))
1051 })?;
1052 conn.execute(
1053 "INSERT OR REPLACE INTO vector_profiles \
1054 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1055 rusqlite::params![profile, table_name, dimension_i64],
1056 )?;
1057 Ok(())
1058 }
1059
1060 #[cfg(not(feature = "sqlite-vec"))]
1065 pub fn ensure_vector_profile(
1066 &self,
1067 _conn: &Connection,
1068 _profile: &str,
1069 _table_name: &str,
1070 _dimension: usize,
1071 ) -> Result<(), SchemaError> {
1072 Err(SchemaError::MissingCapability("sqlite-vec"))
1073 }
1074
1075 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1081 conn.execute_batch(
1082 r"
1083 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1084 version INTEGER PRIMARY KEY,
1085 description TEXT NOT NULL,
1086 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1087 );
1088 ",
1089 )?;
1090 Ok(())
1091 }
1092}
1093
1094#[cfg(test)]
1095#[allow(clippy::expect_used)]
1096mod tests {
1097 use rusqlite::Connection;
1098
1099 use super::SchemaManager;
1100
1101 #[test]
1102 fn bootstrap_applies_initial_schema() {
1103 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1104 let manager = SchemaManager::new();
1105
1106 let report = manager.bootstrap(&conn).expect("bootstrap report");
1107
1108 assert_eq!(
1109 report.applied_versions.len(),
1110 manager.current_version().0 as usize
1111 );
1112 assert!(report.sqlite_version.starts_with('3'));
1113 let table_count: i64 = conn
1114 .query_row(
1115 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1116 [],
1117 |row| row.get(0),
1118 )
1119 .expect("nodes table exists");
1120 assert_eq!(table_count, 1);
1121 }
1122
1123 #[test]
1126 fn vector_profile_not_enabled_without_feature() {
1127 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1128 let manager = SchemaManager::new();
1129 let report = manager.bootstrap(&conn).expect("bootstrap");
1130 assert!(
1131 !report.vector_profile_enabled,
1132 "vector_profile_enabled must be false on a fresh bootstrap"
1133 );
1134 }
1135
1136 #[test]
1137 fn vector_profile_skipped_when_dimension_absent() {
1138 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1140 let manager = SchemaManager::new();
1141 manager.bootstrap(&conn).expect("bootstrap");
1142
1143 let count: i64 = conn
1144 .query_row(
1145 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1146 [],
1147 |row| row.get(0),
1148 )
1149 .expect("count");
1150 assert_eq!(
1151 count, 0,
1152 "no enabled profile without calling ensure_vector_profile"
1153 );
1154 }
1155
1156 #[test]
1157 fn bootstrap_report_reflects_actual_vector_state() {
1158 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1160 let manager = SchemaManager::new();
1161 let report = manager.bootstrap(&conn).expect("bootstrap");
1162
1163 let db_count: i64 = conn
1164 .query_row(
1165 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1166 [],
1167 |row| row.get(0),
1168 )
1169 .expect("count");
1170 assert_eq!(
1171 report.vector_profile_enabled,
1172 db_count > 0,
1173 "BootstrapReport.vector_profile_enabled must match actual DB state"
1174 );
1175 }
1176
1177 #[test]
1178 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1179 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1180 conn.execute_batch(
1181 r#"
1182 CREATE TABLE provenance_events (
1183 id TEXT PRIMARY KEY,
1184 event_type TEXT NOT NULL,
1185 subject TEXT NOT NULL,
1186 source_ref TEXT,
1187 created_at INTEGER NOT NULL DEFAULT (unixepoch())
1188 );
1189 CREATE TABLE vector_embedding_contracts (
1190 profile TEXT PRIMARY KEY,
1191 table_name TEXT NOT NULL,
1192 model_identity TEXT NOT NULL,
1193 model_version TEXT NOT NULL,
1194 dimension INTEGER NOT NULL,
1195 normalization_policy TEXT NOT NULL,
1196 chunking_policy TEXT NOT NULL,
1197 preprocessing_policy TEXT NOT NULL,
1198 generator_command_json TEXT NOT NULL,
1199 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1200 applied_at INTEGER NOT NULL DEFAULT 0,
1201 snapshot_hash TEXT NOT NULL DEFAULT ''
1202 );
1203 INSERT INTO vector_embedding_contracts (
1204 profile,
1205 table_name,
1206 model_identity,
1207 model_version,
1208 dimension,
1209 normalization_policy,
1210 chunking_policy,
1211 preprocessing_policy,
1212 generator_command_json,
1213 updated_at,
1214 applied_at,
1215 snapshot_hash
1216 ) VALUES (
1217 'default',
1218 'vec_nodes_active',
1219 'legacy-model',
1220 '0.9.0',
1221 4,
1222 'l2',
1223 'per_chunk',
1224 'trim',
1225 '["/bin/echo"]',
1226 100,
1227 100,
1228 'legacy'
1229 );
1230 "#,
1231 )
1232 .expect("seed legacy schema");
1233 let manager = SchemaManager::new();
1234
1235 let report = manager.bootstrap(&conn).expect("bootstrap");
1236
1237 assert!(
1238 report.applied_versions.iter().any(|version| version.0 >= 5),
1239 "bootstrap should apply hardening migrations"
1240 );
1241 let format_version: i64 = conn
1242 .query_row(
1243 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1244 [],
1245 |row| row.get(0),
1246 )
1247 .expect("contract_format_version");
1248 assert_eq!(format_version, 1);
1249 let metadata_column_count: i64 = conn
1250 .query_row(
1251 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1252 [],
1253 |row| row.get(0),
1254 )
1255 .expect("metadata_json column count");
1256 assert_eq!(metadata_column_count, 1);
1257 }
1258
1259 #[test]
1260 fn bootstrap_creates_operational_store_tables() {
1261 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1262 let manager = SchemaManager::new();
1263
1264 manager.bootstrap(&conn).expect("bootstrap");
1265
1266 for table in [
1267 "operational_collections",
1268 "operational_mutations",
1269 "operational_current",
1270 ] {
1271 let count: i64 = conn
1272 .query_row(
1273 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1274 [table],
1275 |row| row.get(0),
1276 )
1277 .expect("table existence");
1278 assert_eq!(count, 1, "{table} should exist after bootstrap");
1279 }
1280 let mutation_order_columns: i64 = conn
1281 .query_row(
1282 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1283 [],
1284 |row| row.get(0),
1285 )
1286 .expect("mutation_order column exists");
1287 assert_eq!(mutation_order_columns, 1);
1288 }
1289
1290 #[test]
1291 fn bootstrap_is_idempotent_with_operational_store_tables() {
1292 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1293 let manager = SchemaManager::new();
1294
1295 manager.bootstrap(&conn).expect("first bootstrap");
1296 let report = manager.bootstrap(&conn).expect("second bootstrap");
1297
1298 assert!(
1299 report.applied_versions.is_empty(),
1300 "second bootstrap should apply no new migrations"
1301 );
1302 let count: i64 = conn
1303 .query_row(
1304 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1305 [],
1306 |row| row.get(0),
1307 )
1308 .expect("operational_collections table exists");
1309 assert_eq!(count, 1);
1310 }
1311
1312 #[test]
1313 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1314 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1315 conn.execute_batch(
1316 r#"
1317 CREATE TABLE operational_collections (
1318 name TEXT PRIMARY KEY,
1319 kind TEXT NOT NULL,
1320 schema_json TEXT NOT NULL,
1321 retention_json TEXT NOT NULL,
1322 format_version INTEGER NOT NULL DEFAULT 1,
1323 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1324 disabled_at INTEGER
1325 );
1326
1327 CREATE TABLE operational_mutations (
1328 id TEXT PRIMARY KEY,
1329 collection_name TEXT NOT NULL,
1330 record_key TEXT NOT NULL,
1331 op_kind TEXT NOT NULL,
1332 payload_json TEXT NOT NULL,
1333 source_ref TEXT,
1334 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1335 mutation_order INTEGER NOT NULL DEFAULT 0,
1336 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1337 );
1338
1339 CREATE TABLE operational_current (
1340 collection_name TEXT NOT NULL,
1341 record_key TEXT NOT NULL,
1342 payload_json TEXT NOT NULL,
1343 updated_at INTEGER NOT NULL,
1344 last_mutation_id TEXT NOT NULL,
1345 PRIMARY KEY(collection_name, record_key),
1346 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1347 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1348 );
1349
1350 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1351 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1352 INSERT INTO operational_mutations (
1353 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1354 ) VALUES (
1355 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1356 );
1357 "#,
1358 )
1359 .expect("seed recovered operational tables");
1360
1361 let manager = SchemaManager::new();
1362 let report = manager
1363 .bootstrap(&conn)
1364 .expect("bootstrap recovered schema");
1365
1366 assert!(
1367 report.applied_versions.iter().any(|version| version.0 == 8),
1368 "bootstrap should record operational mutation ordering hardening"
1369 );
1370 let mutation_order: i64 = conn
1371 .query_row(
1372 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1373 [],
1374 |row| row.get(0),
1375 )
1376 .expect("mutation_order");
1377 assert_ne!(
1378 mutation_order, 0,
1379 "bootstrap should backfill recovered operational rows"
1380 );
1381 let count: i64 = conn
1382 .query_row(
1383 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1384 [],
1385 |row| row.get(0),
1386 )
1387 .expect("ordering index exists");
1388 assert_eq!(count, 1);
1389 }
1390
1391 #[test]
1392 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1393 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1394 conn.execute_batch(
1395 r#"
1396 CREATE TABLE operational_collections (
1397 name TEXT PRIMARY KEY,
1398 kind TEXT NOT NULL,
1399 schema_json TEXT NOT NULL,
1400 retention_json TEXT NOT NULL,
1401 format_version INTEGER NOT NULL DEFAULT 1,
1402 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1403 disabled_at INTEGER
1404 );
1405
1406 CREATE TABLE operational_mutations (
1407 id TEXT PRIMARY KEY,
1408 collection_name TEXT NOT NULL,
1409 record_key TEXT NOT NULL,
1410 op_kind TEXT NOT NULL,
1411 payload_json TEXT NOT NULL,
1412 source_ref TEXT,
1413 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1414 mutation_order INTEGER NOT NULL DEFAULT 1,
1415 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1416 );
1417
1418 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1419 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1420 "#,
1421 )
1422 .expect("seed recovered operational schema");
1423
1424 let manager = SchemaManager::new();
1425 let report = manager
1426 .bootstrap(&conn)
1427 .expect("bootstrap recovered schema");
1428
1429 assert!(
1430 report
1431 .applied_versions
1432 .iter()
1433 .any(|version| version.0 == 10),
1434 "bootstrap should record operational filtered read migration"
1435 );
1436 assert!(
1437 report
1438 .applied_versions
1439 .iter()
1440 .any(|version| version.0 == 11),
1441 "bootstrap should record operational validation migration"
1442 );
1443 let filter_fields_json: String = conn
1444 .query_row(
1445 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1446 [],
1447 |row| row.get(0),
1448 )
1449 .expect("filter_fields_json added");
1450 assert_eq!(filter_fields_json, "[]");
1451 let validation_json: String = conn
1452 .query_row(
1453 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1454 [],
1455 |row| row.get(0),
1456 )
1457 .expect("validation_json added");
1458 assert_eq!(validation_json, "");
1459 let table_count: i64 = conn
1460 .query_row(
1461 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1462 [],
1463 |row| row.get(0),
1464 )
1465 .expect("filter table exists");
1466 assert_eq!(table_count, 1);
1467 }
1468
1469 #[test]
1470 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1471 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1472 let manager = SchemaManager::new();
1473 manager.bootstrap(&conn).expect("initial bootstrap");
1474
1475 conn.execute("DROP TABLE fathom_schema_migrations", [])
1476 .expect("drop migration history");
1477 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1478
1479 let report = manager
1480 .bootstrap(&conn)
1481 .expect("rebootstrap existing schema");
1482
1483 assert!(
1484 report
1485 .applied_versions
1486 .iter()
1487 .any(|version| version.0 == 10),
1488 "rebootstrap should re-record migration 10"
1489 );
1490 assert!(
1491 report
1492 .applied_versions
1493 .iter()
1494 .any(|version| version.0 == 11),
1495 "rebootstrap should re-record migration 11"
1496 );
1497 let filter_fields_json: String = conn
1498 .query_row(
1499 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1500 [],
1501 |row| row.get(0),
1502 )
1503 .unwrap_or_else(|_| "[]".to_string());
1504 assert_eq!(filter_fields_json, "[]");
1505 let validation_json: String = conn
1506 .query_row(
1507 "SELECT validation_json FROM operational_collections LIMIT 1",
1508 [],
1509 |row| row.get(0),
1510 )
1511 .unwrap_or_default();
1512 assert_eq!(validation_json, "");
1513 }
1514
1515 #[test]
1516 fn downgrade_detected_returns_version_mismatch() {
1517 use crate::SchemaError;
1518
1519 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1520 let manager = SchemaManager::new();
1521 manager.bootstrap(&conn).expect("initial bootstrap");
1522
1523 conn.execute(
1524 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1525 (999_i64, "future migration"),
1526 )
1527 .expect("insert future version");
1528
1529 let err = manager
1530 .bootstrap(&conn)
1531 .expect_err("should fail on downgrade");
1532 assert!(
1533 matches!(
1534 err,
1535 SchemaError::VersionMismatch {
1536 database_version: 999,
1537 ..
1538 }
1539 ),
1540 "expected VersionMismatch with database_version 999, got: {err}"
1541 );
1542 }
1543
1544 #[test]
1545 fn journal_size_limit_is_set() {
1546 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1547 let manager = SchemaManager::new();
1548 manager
1549 .initialize_connection(&conn)
1550 .expect("initialize_connection");
1551
1552 let limit: i64 = conn
1553 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1554 .expect("journal_size_limit pragma");
1555 assert_eq!(limit, 536_870_912);
1556 }
1557
1558 #[cfg(feature = "sqlite-vec")]
1559 #[test]
1560 fn vector_profile_created_when_feature_enabled() {
1561 unsafe {
1564 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1565 sqlite_vec::sqlite3_vec_init as *const (),
1566 )));
1567 }
1568 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1569 let manager = SchemaManager::new();
1570 manager.bootstrap(&conn).expect("bootstrap");
1571
1572 manager
1573 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1574 .expect("ensure_vector_profile");
1575
1576 let count: i64 = conn
1577 .query_row(
1578 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1579 [],
1580 |row| row.get(0),
1581 )
1582 .expect("count");
1583 assert_eq!(
1584 count, 1,
1585 "vector profile must be enabled after ensure_vector_profile"
1586 );
1587
1588 let _: i64 = conn
1590 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1591 row.get(0)
1592 })
1593 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1594 }
1595}