1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6 Migration::new(
7 SchemaVersion(1),
8 "initial canonical schema and runtime tables",
9 r"
10 CREATE TABLE IF NOT EXISTS nodes (
11 row_id TEXT PRIMARY KEY,
12 logical_id TEXT NOT NULL,
13 kind TEXT NOT NULL,
14 properties BLOB NOT NULL,
15 created_at INTEGER NOT NULL,
16 superseded_at INTEGER,
17 source_ref TEXT,
18 confidence REAL
19 );
20
21 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22 ON nodes(logical_id)
23 WHERE superseded_at IS NULL;
24 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25 ON nodes(kind, superseded_at);
26 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27 ON nodes(source_ref);
28
29 CREATE TABLE IF NOT EXISTS edges (
30 row_id TEXT PRIMARY KEY,
31 logical_id TEXT NOT NULL,
32 source_logical_id TEXT NOT NULL,
33 target_logical_id TEXT NOT NULL,
34 kind TEXT NOT NULL,
35 properties BLOB NOT NULL,
36 created_at INTEGER NOT NULL,
37 superseded_at INTEGER,
38 source_ref TEXT,
39 confidence REAL
40 );
41
42 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43 ON edges(logical_id)
44 WHERE superseded_at IS NULL;
45 CREATE INDEX IF NOT EXISTS idx_edges_source_active
46 ON edges(source_logical_id, kind, superseded_at);
47 CREATE INDEX IF NOT EXISTS idx_edges_target_active
48 ON edges(target_logical_id, kind, superseded_at);
49 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50 ON edges(source_ref);
51
52 CREATE TABLE IF NOT EXISTS chunks (
53 id TEXT PRIMARY KEY,
54 node_logical_id TEXT NOT NULL,
55 text_content TEXT NOT NULL,
56 byte_start INTEGER,
57 byte_end INTEGER,
58 created_at INTEGER NOT NULL
59 );
60
61 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62 ON chunks(node_logical_id);
63
64 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65 chunk_id UNINDEXED,
66 node_logical_id UNINDEXED,
67 kind UNINDEXED,
68 text_content
69 );
70
71 CREATE TABLE IF NOT EXISTS vector_profiles (
72 profile TEXT PRIMARY KEY,
73 table_name TEXT NOT NULL,
74 dimension INTEGER NOT NULL,
75 enabled INTEGER NOT NULL DEFAULT 0
76 );
77
78 CREATE TABLE IF NOT EXISTS runs (
79 id TEXT PRIMARY KEY,
80 kind TEXT NOT NULL,
81 status TEXT NOT NULL,
82 properties BLOB NOT NULL,
83 created_at INTEGER NOT NULL,
84 completed_at INTEGER,
85 superseded_at INTEGER,
86 source_ref TEXT
87 );
88
89 CREATE TABLE IF NOT EXISTS steps (
90 id TEXT PRIMARY KEY,
91 run_id TEXT NOT NULL,
92 kind TEXT NOT NULL,
93 status TEXT NOT NULL,
94 properties BLOB NOT NULL,
95 created_at INTEGER NOT NULL,
96 completed_at INTEGER,
97 superseded_at INTEGER,
98 source_ref TEXT,
99 FOREIGN KEY(run_id) REFERENCES runs(id)
100 );
101
102 CREATE TABLE IF NOT EXISTS actions (
103 id TEXT PRIMARY KEY,
104 step_id TEXT NOT NULL,
105 kind TEXT NOT NULL,
106 status TEXT NOT NULL,
107 properties BLOB NOT NULL,
108 created_at INTEGER NOT NULL,
109 completed_at INTEGER,
110 superseded_at INTEGER,
111 source_ref TEXT,
112 FOREIGN KEY(step_id) REFERENCES steps(id)
113 );
114
115 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116 ON runs(source_ref);
117 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118 ON steps(source_ref);
119 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120 ON actions(source_ref);
121 ",
122 ),
123 Migration::new(
124 SchemaVersion(2),
125 "durable audit trail: provenance_events table",
126 r"
127 CREATE TABLE IF NOT EXISTS provenance_events (
128 id TEXT PRIMARY KEY,
129 event_type TEXT NOT NULL,
130 subject TEXT NOT NULL,
131 source_ref TEXT,
132 created_at INTEGER NOT NULL DEFAULT (unixepoch())
133 );
134 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135 ON provenance_events (subject, event_type);
136 ",
137 ),
138 Migration::new(
139 SchemaVersion(3),
140 "vector regeneration contracts",
141 r"
142 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143 profile TEXT PRIMARY KEY,
144 table_name TEXT NOT NULL,
145 model_identity TEXT NOT NULL,
146 model_version TEXT NOT NULL,
147 dimension INTEGER NOT NULL,
148 normalization_policy TEXT NOT NULL,
149 chunking_policy TEXT NOT NULL,
150 preprocessing_policy TEXT NOT NULL,
151 generator_command_json TEXT NOT NULL,
152 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153 );
154 ",
155 ),
156 Migration::new(
157 SchemaVersion(4),
158 "vector regeneration apply metadata",
159 r"
160 ALTER TABLE vector_embedding_contracts
161 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162 ALTER TABLE vector_embedding_contracts
163 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164 UPDATE vector_embedding_contracts
165 SET
166 applied_at = CASE
167 WHEN applied_at = 0 THEN updated_at
168 ELSE applied_at
169 END,
170 snapshot_hash = CASE
171 WHEN snapshot_hash = '' THEN 'legacy'
172 ELSE snapshot_hash
173 END;
174 ",
175 ),
176 Migration::new(
177 SchemaVersion(5),
178 "vector regeneration contract format version",
179 r"
180 ALTER TABLE vector_embedding_contracts
181 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182 UPDATE vector_embedding_contracts
183 SET contract_format_version = 1
184 WHERE contract_format_version = 0;
185 ",
186 ),
187 Migration::new(
188 SchemaVersion(6),
189 "provenance metadata payloads",
190 r"
191 ALTER TABLE provenance_events
192 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193 ",
194 ),
195 Migration::new(
196 SchemaVersion(7),
197 "operational store canonical and derived tables",
198 r"
199 CREATE TABLE IF NOT EXISTS operational_collections (
200 name TEXT PRIMARY KEY,
201 kind TEXT NOT NULL,
202 schema_json TEXT NOT NULL,
203 retention_json TEXT NOT NULL,
204 format_version INTEGER NOT NULL DEFAULT 1,
205 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206 disabled_at INTEGER
207 );
208
209 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210 ON operational_collections(kind, disabled_at);
211
212 CREATE TABLE IF NOT EXISTS operational_mutations (
213 id TEXT PRIMARY KEY,
214 collection_name TEXT NOT NULL,
215 record_key TEXT NOT NULL,
216 op_kind TEXT NOT NULL,
217 payload_json TEXT NOT NULL,
218 source_ref TEXT,
219 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221 );
222
223 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226 ON operational_mutations(source_ref);
227
228 CREATE TABLE IF NOT EXISTS operational_current (
229 collection_name TEXT NOT NULL,
230 record_key TEXT NOT NULL,
231 payload_json TEXT NOT NULL,
232 updated_at INTEGER NOT NULL,
233 last_mutation_id TEXT NOT NULL,
234 PRIMARY KEY(collection_name, record_key),
235 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237 );
238
239 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240 ON operational_current(collection_name, updated_at DESC);
241 ",
242 ),
243 Migration::new(
244 SchemaVersion(8),
245 "operational mutation ordering hardening",
246 r"
247 ALTER TABLE operational_mutations
248 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249 UPDATE operational_mutations
250 SET mutation_order = rowid
251 WHERE mutation_order = 0;
252 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253 ON operational_mutations(collection_name, record_key, mutation_order DESC);
254 ",
255 ),
256 Migration::new(
257 SchemaVersion(9),
258 "node last_accessed metadata",
259 r"
260 CREATE TABLE IF NOT EXISTS node_access_metadata (
261 logical_id TEXT PRIMARY KEY,
262 last_accessed_at INTEGER NOT NULL,
263 updated_at INTEGER NOT NULL
264 );
265
266 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267 ON node_access_metadata(last_accessed_at DESC);
268 ",
269 ),
270 Migration::new(
271 SchemaVersion(10),
272 "operational filtered read contracts and extracted values",
273 r"
274 ALTER TABLE operational_collections
275 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277 CREATE TABLE IF NOT EXISTS operational_filter_values (
278 mutation_id TEXT NOT NULL,
279 collection_name TEXT NOT NULL,
280 field_name TEXT NOT NULL,
281 string_value TEXT,
282 integer_value INTEGER,
283 PRIMARY KEY(mutation_id, field_name),
284 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286 );
287
288 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292 ",
293 ),
294 Migration::new(
295 SchemaVersion(11),
296 "operational payload validation contracts",
297 r"
298 ALTER TABLE operational_collections
299 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300 ",
301 ),
302 Migration::new(
303 SchemaVersion(12),
304 "operational secondary indexes",
305 r"
306 ALTER TABLE operational_collections
307 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310 collection_name TEXT NOT NULL,
311 index_name TEXT NOT NULL,
312 subject_kind TEXT NOT NULL,
313 mutation_id TEXT NOT NULL DEFAULT '',
314 record_key TEXT NOT NULL DEFAULT '',
315 sort_timestamp INTEGER,
316 slot1_text TEXT,
317 slot1_integer INTEGER,
318 slot2_text TEXT,
319 slot2_integer INTEGER,
320 slot3_text TEXT,
321 slot3_integer INTEGER,
322 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325 );
326
327 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328 ON operational_secondary_index_entries(
329 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330 );
331 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332 ON operational_secondary_index_entries(
333 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334 );
335 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336 ON operational_secondary_index_entries(
337 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338 );
339 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340 ON operational_secondary_index_entries(
341 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342 );
343 ",
344 ),
345 Migration::new(
346 SchemaVersion(13),
347 "operational retention run metadata",
348 r"
349 CREATE TABLE IF NOT EXISTS operational_retention_runs (
350 id TEXT PRIMARY KEY,
351 collection_name TEXT NOT NULL,
352 executed_at INTEGER NOT NULL,
353 action_kind TEXT NOT NULL,
354 dry_run INTEGER NOT NULL DEFAULT 0,
355 deleted_mutations INTEGER NOT NULL,
356 rows_remaining INTEGER NOT NULL,
357 metadata_json TEXT NOT NULL DEFAULT '',
358 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359 );
360
361 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362 ON operational_retention_runs(collection_name, executed_at DESC);
363 ",
364 ),
365 Migration::new(
366 SchemaVersion(14),
367 "external content object columns",
368 r"
369 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372 ON nodes(content_ref)
373 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376 ",
377 ),
378 Migration::new(
379 SchemaVersion(15),
380 "FTS property projection schemas",
381 r"
382 CREATE TABLE IF NOT EXISTS fts_property_schemas (
383 kind TEXT PRIMARY KEY,
384 property_paths_json TEXT NOT NULL,
385 separator TEXT NOT NULL DEFAULT ' ',
386 format_version INTEGER NOT NULL DEFAULT 1,
387 created_at INTEGER NOT NULL DEFAULT (unixepoch())
388 );
389
390 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391 node_logical_id UNINDEXED,
392 kind UNINDEXED,
393 text_content
394 );
395 ",
396 ),
397 Migration::new(
398 SchemaVersion(16),
399 "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
400 "",
406 ),
407 Migration::new(
408 SchemaVersion(17),
409 "fts property position-map sidecar for recursive extraction",
410 r"
425 CREATE TABLE IF NOT EXISTS fts_node_property_positions (
426 node_logical_id TEXT NOT NULL,
427 kind TEXT NOT NULL,
428 start_offset INTEGER NOT NULL,
429 end_offset INTEGER NOT NULL,
430 leaf_path TEXT NOT NULL
431 );
432
433 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
434 ON fts_node_property_positions(node_logical_id, kind);
435
436 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
437 ON fts_node_property_positions(kind);
438 ",
439 ),
440 Migration::new(
441 SchemaVersion(18),
442 "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
443 r"
456 DROP TABLE IF EXISTS fts_node_property_positions;
457
458 CREATE TABLE fts_node_property_positions (
459 node_logical_id TEXT NOT NULL,
460 kind TEXT NOT NULL,
461 start_offset INTEGER NOT NULL,
462 end_offset INTEGER NOT NULL,
463 leaf_path TEXT NOT NULL,
464 UNIQUE(node_logical_id, kind, start_offset)
465 );
466
467 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
468 ON fts_node_property_positions(node_logical_id, kind);
469
470 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
471 ON fts_node_property_positions(kind);
472 ",
473 ),
474];
475
476#[derive(Clone, Debug, PartialEq, Eq)]
477pub struct BootstrapReport {
478 pub sqlite_version: String,
479 pub applied_versions: Vec<SchemaVersion>,
480 pub vector_profile_enabled: bool,
481}
482
483#[derive(Clone, Debug, Default)]
484pub struct SchemaManager;
485
486impl SchemaManager {
487 #[must_use]
488 pub fn new() -> Self {
489 Self
490 }
491
492 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
500 self.initialize_connection(conn)?;
501 Self::ensure_metadata_tables(conn)?;
502
503 let max_applied: u32 = conn.query_row(
505 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
506 [],
507 |row| row.get(0),
508 )?;
509 let engine_version = self.current_version().0;
510 trace_info!(
511 current_version = max_applied,
512 engine_version,
513 "schema bootstrap: version check"
514 );
515 if max_applied > engine_version {
516 trace_error!(
517 database_version = max_applied,
518 engine_version,
519 "schema version mismatch: database is newer than engine"
520 );
521 return Err(SchemaError::VersionMismatch {
522 database_version: max_applied,
523 engine_version,
524 });
525 }
526
527 let mut applied_versions = Vec::new();
528 for migration in self.migrations() {
529 let already_applied = conn
530 .query_row(
531 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
532 [i64::from(migration.version.0)],
533 |row| row.get::<_, i64>(0),
534 )
535 .optional()?
536 .is_some();
537
538 if already_applied {
539 continue;
540 }
541
542 let tx = conn.unchecked_transaction()?;
543 match migration.version {
544 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
545 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
546 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
547 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
548 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
549 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
550 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
551 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
552 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
553 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
554 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
555 SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
556 _ => tx.execute_batch(migration.sql)?,
557 }
558 tx.execute(
559 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
560 (i64::from(migration.version.0), migration.description),
561 )?;
562 tx.commit()?;
563 trace_info!(
564 version = migration.version.0,
565 description = migration.description,
566 "schema migration applied"
567 );
568 applied_versions.push(migration.version);
569 }
570
571 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
572 let vector_profile_count: i64 = conn.query_row(
573 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
574 [],
575 |row| row.get(0),
576 )?;
577 Ok(BootstrapReport {
578 sqlite_version,
579 applied_versions,
580 vector_profile_enabled: vector_profile_count > 0,
581 })
582 }
583
584 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
585 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
586 let columns = stmt
587 .query_map([], |row| row.get::<_, String>(1))?
588 .collect::<Result<Vec<_>, _>>()?;
589 let has_applied_at = columns.iter().any(|column| column == "applied_at");
590 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
591
592 if !has_applied_at {
593 conn.execute(
594 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
595 [],
596 )?;
597 }
598 if !has_snapshot_hash {
599 conn.execute(
600 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
601 [],
602 )?;
603 }
604 conn.execute(
605 r"
606 UPDATE vector_embedding_contracts
607 SET
608 applied_at = CASE
609 WHEN applied_at = 0 THEN updated_at
610 ELSE applied_at
611 END,
612 snapshot_hash = CASE
613 WHEN snapshot_hash = '' THEN 'legacy'
614 ELSE snapshot_hash
615 END
616 ",
617 [],
618 )?;
619 Ok(())
620 }
621
622 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
623 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
624 let columns = stmt
625 .query_map([], |row| row.get::<_, String>(1))?
626 .collect::<Result<Vec<_>, _>>()?;
627 let has_contract_format_version = columns
628 .iter()
629 .any(|column| column == "contract_format_version");
630
631 if !has_contract_format_version {
632 conn.execute(
633 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
634 [],
635 )?;
636 }
637 conn.execute(
638 r"
639 UPDATE vector_embedding_contracts
640 SET contract_format_version = 1
641 WHERE contract_format_version = 0
642 ",
643 [],
644 )?;
645 Ok(())
646 }
647
648 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
649 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
650 let columns = stmt
651 .query_map([], |row| row.get::<_, String>(1))?
652 .collect::<Result<Vec<_>, _>>()?;
653 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
654
655 if !has_metadata_json {
656 conn.execute(
657 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
658 [],
659 )?;
660 }
661 Ok(())
662 }
663
664 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
665 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
666 let columns = stmt
667 .query_map([], |row| row.get::<_, String>(1))?
668 .collect::<Result<Vec<_>, _>>()?;
669 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
670
671 if !has_mutation_order {
672 conn.execute(
673 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
674 [],
675 )?;
676 }
677 conn.execute(
678 r"
679 UPDATE operational_mutations
680 SET mutation_order = rowid
681 WHERE mutation_order = 0
682 ",
683 [],
684 )?;
685 conn.execute(
686 r"
687 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
688 ON operational_mutations(collection_name, record_key, mutation_order DESC)
689 ",
690 [],
691 )?;
692 Ok(())
693 }
694
695 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
696 conn.execute_batch(
697 r"
698 CREATE TABLE IF NOT EXISTS node_access_metadata (
699 logical_id TEXT PRIMARY KEY,
700 last_accessed_at INTEGER NOT NULL,
701 updated_at INTEGER NOT NULL
702 );
703
704 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
705 ON node_access_metadata(last_accessed_at DESC);
706 ",
707 )?;
708 Ok(())
709 }
710
711 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
712 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
713 let columns = stmt
714 .query_map([], |row| row.get::<_, String>(1))?
715 .collect::<Result<Vec<_>, _>>()?;
716 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
717
718 if !has_filter_fields_json {
719 conn.execute(
720 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
721 [],
722 )?;
723 }
724
725 conn.execute_batch(
726 r"
727 CREATE TABLE IF NOT EXISTS operational_filter_values (
728 mutation_id TEXT NOT NULL,
729 collection_name TEXT NOT NULL,
730 field_name TEXT NOT NULL,
731 string_value TEXT,
732 integer_value INTEGER,
733 PRIMARY KEY(mutation_id, field_name),
734 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
735 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
736 );
737
738 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
739 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
740 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
741 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
742 ",
743 )?;
744 Ok(())
745 }
746
747 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
748 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
749 let columns = stmt
750 .query_map([], |row| row.get::<_, String>(1))?
751 .collect::<Result<Vec<_>, _>>()?;
752 let has_validation_json = columns.iter().any(|column| column == "validation_json");
753
754 if !has_validation_json {
755 conn.execute(
756 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
757 [],
758 )?;
759 }
760
761 Ok(())
762 }
763
764 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
765 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
766 let columns = stmt
767 .query_map([], |row| row.get::<_, String>(1))?
768 .collect::<Result<Vec<_>, _>>()?;
769 let has_secondary_indexes_json = columns
770 .iter()
771 .any(|column| column == "secondary_indexes_json");
772
773 if !has_secondary_indexes_json {
774 conn.execute(
775 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
776 [],
777 )?;
778 }
779
780 conn.execute_batch(
781 r"
782 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
783 collection_name TEXT NOT NULL,
784 index_name TEXT NOT NULL,
785 subject_kind TEXT NOT NULL,
786 mutation_id TEXT NOT NULL DEFAULT '',
787 record_key TEXT NOT NULL DEFAULT '',
788 sort_timestamp INTEGER,
789 slot1_text TEXT,
790 slot1_integer INTEGER,
791 slot2_text TEXT,
792 slot2_integer INTEGER,
793 slot3_text TEXT,
794 slot3_integer INTEGER,
795 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
796 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
797 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
798 );
799
800 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
801 ON operational_secondary_index_entries(
802 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
803 );
804 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
805 ON operational_secondary_index_entries(
806 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
807 );
808 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
809 ON operational_secondary_index_entries(
810 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
811 );
812 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
813 ON operational_secondary_index_entries(
814 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
815 );
816 ",
817 )?;
818
819 Ok(())
820 }
821
822 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
823 conn.execute_batch(
824 r"
825 CREATE TABLE IF NOT EXISTS operational_retention_runs (
826 id TEXT PRIMARY KEY,
827 collection_name TEXT NOT NULL,
828 executed_at INTEGER NOT NULL,
829 action_kind TEXT NOT NULL,
830 dry_run INTEGER NOT NULL DEFAULT 0,
831 deleted_mutations INTEGER NOT NULL,
832 rows_remaining INTEGER NOT NULL,
833 metadata_json TEXT NOT NULL DEFAULT '',
834 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
835 );
836
837 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
838 ON operational_retention_runs(collection_name, executed_at DESC);
839 ",
840 )?;
841 Ok(())
842 }
843
844 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
845 let node_columns = Self::column_names(conn, "nodes")?;
846 if !node_columns.iter().any(|c| c == "content_ref") {
847 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
848 }
849 conn.execute_batch(
850 r"
851 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
852 ON nodes(content_ref)
853 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
854 ",
855 )?;
856
857 let chunk_columns = Self::column_names(conn, "chunks")?;
858 if !chunk_columns.iter().any(|c| c == "content_hash") {
859 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
860 }
861 Ok(())
862 }
863
864 fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
882 conn.execute_batch(
883 r"
884 DROP TABLE IF EXISTS fts_nodes;
885 CREATE VIRTUAL TABLE fts_nodes USING fts5(
886 chunk_id UNINDEXED,
887 node_logical_id UNINDEXED,
888 kind UNINDEXED,
889 text_content,
890 tokenize = 'porter unicode61 remove_diacritics 2'
891 );
892
893 DROP TABLE IF EXISTS fts_node_properties;
894 CREATE VIRTUAL TABLE fts_node_properties USING fts5(
895 node_logical_id UNINDEXED,
896 kind UNINDEXED,
897 text_content,
898 tokenize = 'porter unicode61 remove_diacritics 2'
899 );
900
901 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
902 SELECT c.id, n.logical_id, n.kind, c.text_content
903 FROM chunks c
904 JOIN nodes n
905 ON n.logical_id = c.node_logical_id
906 AND n.superseded_at IS NULL;
907 ",
908 )?;
909 Ok(())
910 }
911
912 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
913 conn.execute_batch(
914 r"
915 CREATE TABLE IF NOT EXISTS fts_property_schemas (
916 kind TEXT PRIMARY KEY,
917 property_paths_json TEXT NOT NULL,
918 separator TEXT NOT NULL DEFAULT ' ',
919 format_version INTEGER NOT NULL DEFAULT 1,
920 created_at INTEGER NOT NULL DEFAULT (unixepoch())
921 );
922
923 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
924 node_logical_id UNINDEXED,
925 kind UNINDEXED,
926 text_content
927 );
928 ",
929 )?;
930 Ok(())
931 }
932
933 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
934 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
935 let names = stmt
936 .query_map([], |row| row.get::<_, String>(1))?
937 .collect::<Result<Vec<_>, _>>()?;
938 Ok(names)
939 }
940
941 #[must_use]
942 pub fn current_version(&self) -> SchemaVersion {
943 self.migrations()
944 .last()
945 .map_or(SchemaVersion(0), |migration| migration.version)
946 }
947
948 #[must_use]
949 pub fn migrations(&self) -> &'static [Migration] {
950 MIGRATIONS
951 }
952
953 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
959 conn.execute_batch(
960 r"
961 PRAGMA foreign_keys = ON;
962 PRAGMA journal_mode = WAL;
963 PRAGMA synchronous = NORMAL;
964 PRAGMA busy_timeout = 5000;
965 PRAGMA temp_store = MEMORY;
966 PRAGMA mmap_size = 3000000000;
967 PRAGMA journal_size_limit = 536870912;
968 ",
969 )?;
970 Ok(())
971 }
972
973 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
984 conn.execute_batch(
985 r"
986 PRAGMA foreign_keys = ON;
987 PRAGMA busy_timeout = 5000;
988 PRAGMA temp_store = MEMORY;
989 PRAGMA mmap_size = 3000000000;
990 ",
991 )?;
992 Ok(())
993 }
994
995 #[cfg(feature = "sqlite-vec")]
1007 pub fn ensure_vector_profile(
1008 &self,
1009 conn: &Connection,
1010 profile: &str,
1011 table_name: &str,
1012 dimension: usize,
1013 ) -> Result<(), SchemaError> {
1014 conn.execute_batch(&format!(
1015 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1016 chunk_id TEXT PRIMARY KEY,\
1017 embedding float[{dimension}]\
1018 )"
1019 ))?;
1020 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1023 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1024 format!("vector dimension {dimension} does not fit in i64").into(),
1025 ))
1026 })?;
1027 conn.execute(
1028 "INSERT OR REPLACE INTO vector_profiles \
1029 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1030 rusqlite::params![profile, table_name, dimension_i64],
1031 )?;
1032 Ok(())
1033 }
1034
1035 #[cfg(not(feature = "sqlite-vec"))]
1040 pub fn ensure_vector_profile(
1041 &self,
1042 _conn: &Connection,
1043 _profile: &str,
1044 _table_name: &str,
1045 _dimension: usize,
1046 ) -> Result<(), SchemaError> {
1047 Err(SchemaError::MissingCapability("sqlite-vec"))
1048 }
1049
1050 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1056 conn.execute_batch(
1057 r"
1058 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1059 version INTEGER PRIMARY KEY,
1060 description TEXT NOT NULL,
1061 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1062 );
1063 ",
1064 )?;
1065 Ok(())
1066 }
1067}
1068
1069#[cfg(test)]
1070#[allow(clippy::expect_used)]
1071mod tests {
1072 use rusqlite::Connection;
1073
1074 use super::SchemaManager;
1075
1076 #[test]
1077 fn bootstrap_applies_initial_schema() {
1078 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1079 let manager = SchemaManager::new();
1080
1081 let report = manager.bootstrap(&conn).expect("bootstrap report");
1082
1083 assert_eq!(
1084 report.applied_versions.len(),
1085 manager.current_version().0 as usize
1086 );
1087 assert!(report.sqlite_version.starts_with('3'));
1088 let table_count: i64 = conn
1089 .query_row(
1090 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1091 [],
1092 |row| row.get(0),
1093 )
1094 .expect("nodes table exists");
1095 assert_eq!(table_count, 1);
1096 }
1097
1098 #[test]
1101 fn vector_profile_not_enabled_without_feature() {
1102 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1103 let manager = SchemaManager::new();
1104 let report = manager.bootstrap(&conn).expect("bootstrap");
1105 assert!(
1106 !report.vector_profile_enabled,
1107 "vector_profile_enabled must be false on a fresh bootstrap"
1108 );
1109 }
1110
1111 #[test]
1112 fn vector_profile_skipped_when_dimension_absent() {
1113 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1115 let manager = SchemaManager::new();
1116 manager.bootstrap(&conn).expect("bootstrap");
1117
1118 let count: i64 = conn
1119 .query_row(
1120 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1121 [],
1122 |row| row.get(0),
1123 )
1124 .expect("count");
1125 assert_eq!(
1126 count, 0,
1127 "no enabled profile without calling ensure_vector_profile"
1128 );
1129 }
1130
1131 #[test]
1132 fn bootstrap_report_reflects_actual_vector_state() {
1133 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1135 let manager = SchemaManager::new();
1136 let report = manager.bootstrap(&conn).expect("bootstrap");
1137
1138 let db_count: i64 = conn
1139 .query_row(
1140 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1141 [],
1142 |row| row.get(0),
1143 )
1144 .expect("count");
1145 assert_eq!(
1146 report.vector_profile_enabled,
1147 db_count > 0,
1148 "BootstrapReport.vector_profile_enabled must match actual DB state"
1149 );
1150 }
1151
1152 #[test]
1153 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1154 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1155 conn.execute_batch(
1156 r#"
1157 CREATE TABLE provenance_events (
1158 id TEXT PRIMARY KEY,
1159 event_type TEXT NOT NULL,
1160 subject TEXT NOT NULL,
1161 source_ref TEXT,
1162 created_at INTEGER NOT NULL DEFAULT (unixepoch())
1163 );
1164 CREATE TABLE vector_embedding_contracts (
1165 profile TEXT PRIMARY KEY,
1166 table_name TEXT NOT NULL,
1167 model_identity TEXT NOT NULL,
1168 model_version TEXT NOT NULL,
1169 dimension INTEGER NOT NULL,
1170 normalization_policy TEXT NOT NULL,
1171 chunking_policy TEXT NOT NULL,
1172 preprocessing_policy TEXT NOT NULL,
1173 generator_command_json TEXT NOT NULL,
1174 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1175 applied_at INTEGER NOT NULL DEFAULT 0,
1176 snapshot_hash TEXT NOT NULL DEFAULT ''
1177 );
1178 INSERT INTO vector_embedding_contracts (
1179 profile,
1180 table_name,
1181 model_identity,
1182 model_version,
1183 dimension,
1184 normalization_policy,
1185 chunking_policy,
1186 preprocessing_policy,
1187 generator_command_json,
1188 updated_at,
1189 applied_at,
1190 snapshot_hash
1191 ) VALUES (
1192 'default',
1193 'vec_nodes_active',
1194 'legacy-model',
1195 '0.9.0',
1196 4,
1197 'l2',
1198 'per_chunk',
1199 'trim',
1200 '["/bin/echo"]',
1201 100,
1202 100,
1203 'legacy'
1204 );
1205 "#,
1206 )
1207 .expect("seed legacy schema");
1208 let manager = SchemaManager::new();
1209
1210 let report = manager.bootstrap(&conn).expect("bootstrap");
1211
1212 assert!(
1213 report.applied_versions.iter().any(|version| version.0 >= 5),
1214 "bootstrap should apply hardening migrations"
1215 );
1216 let format_version: i64 = conn
1217 .query_row(
1218 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1219 [],
1220 |row| row.get(0),
1221 )
1222 .expect("contract_format_version");
1223 assert_eq!(format_version, 1);
1224 let metadata_column_count: i64 = conn
1225 .query_row(
1226 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1227 [],
1228 |row| row.get(0),
1229 )
1230 .expect("metadata_json column count");
1231 assert_eq!(metadata_column_count, 1);
1232 }
1233
1234 #[test]
1235 fn bootstrap_creates_operational_store_tables() {
1236 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1237 let manager = SchemaManager::new();
1238
1239 manager.bootstrap(&conn).expect("bootstrap");
1240
1241 for table in [
1242 "operational_collections",
1243 "operational_mutations",
1244 "operational_current",
1245 ] {
1246 let count: i64 = conn
1247 .query_row(
1248 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1249 [table],
1250 |row| row.get(0),
1251 )
1252 .expect("table existence");
1253 assert_eq!(count, 1, "{table} should exist after bootstrap");
1254 }
1255 let mutation_order_columns: i64 = conn
1256 .query_row(
1257 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1258 [],
1259 |row| row.get(0),
1260 )
1261 .expect("mutation_order column exists");
1262 assert_eq!(mutation_order_columns, 1);
1263 }
1264
1265 #[test]
1266 fn bootstrap_is_idempotent_with_operational_store_tables() {
1267 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1268 let manager = SchemaManager::new();
1269
1270 manager.bootstrap(&conn).expect("first bootstrap");
1271 let report = manager.bootstrap(&conn).expect("second bootstrap");
1272
1273 assert!(
1274 report.applied_versions.is_empty(),
1275 "second bootstrap should apply no new migrations"
1276 );
1277 let count: i64 = conn
1278 .query_row(
1279 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1280 [],
1281 |row| row.get(0),
1282 )
1283 .expect("operational_collections table exists");
1284 assert_eq!(count, 1);
1285 }
1286
1287 #[test]
1288 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1289 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1290 conn.execute_batch(
1291 r#"
1292 CREATE TABLE operational_collections (
1293 name TEXT PRIMARY KEY,
1294 kind TEXT NOT NULL,
1295 schema_json TEXT NOT NULL,
1296 retention_json TEXT NOT NULL,
1297 format_version INTEGER NOT NULL DEFAULT 1,
1298 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1299 disabled_at INTEGER
1300 );
1301
1302 CREATE TABLE operational_mutations (
1303 id TEXT PRIMARY KEY,
1304 collection_name TEXT NOT NULL,
1305 record_key TEXT NOT NULL,
1306 op_kind TEXT NOT NULL,
1307 payload_json TEXT NOT NULL,
1308 source_ref TEXT,
1309 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1310 mutation_order INTEGER NOT NULL DEFAULT 0,
1311 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1312 );
1313
1314 CREATE TABLE operational_current (
1315 collection_name TEXT NOT NULL,
1316 record_key TEXT NOT NULL,
1317 payload_json TEXT NOT NULL,
1318 updated_at INTEGER NOT NULL,
1319 last_mutation_id TEXT NOT NULL,
1320 PRIMARY KEY(collection_name, record_key),
1321 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1322 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1323 );
1324
1325 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1326 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1327 INSERT INTO operational_mutations (
1328 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1329 ) VALUES (
1330 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1331 );
1332 "#,
1333 )
1334 .expect("seed recovered operational tables");
1335
1336 let manager = SchemaManager::new();
1337 let report = manager
1338 .bootstrap(&conn)
1339 .expect("bootstrap recovered schema");
1340
1341 assert!(
1342 report.applied_versions.iter().any(|version| version.0 == 8),
1343 "bootstrap should record operational mutation ordering hardening"
1344 );
1345 let mutation_order: i64 = conn
1346 .query_row(
1347 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1348 [],
1349 |row| row.get(0),
1350 )
1351 .expect("mutation_order");
1352 assert_ne!(
1353 mutation_order, 0,
1354 "bootstrap should backfill recovered operational rows"
1355 );
1356 let count: i64 = conn
1357 .query_row(
1358 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1359 [],
1360 |row| row.get(0),
1361 )
1362 .expect("ordering index exists");
1363 assert_eq!(count, 1);
1364 }
1365
1366 #[test]
1367 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1368 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1369 conn.execute_batch(
1370 r#"
1371 CREATE TABLE operational_collections (
1372 name TEXT PRIMARY KEY,
1373 kind TEXT NOT NULL,
1374 schema_json TEXT NOT NULL,
1375 retention_json TEXT NOT NULL,
1376 format_version INTEGER NOT NULL DEFAULT 1,
1377 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1378 disabled_at INTEGER
1379 );
1380
1381 CREATE TABLE operational_mutations (
1382 id TEXT PRIMARY KEY,
1383 collection_name TEXT NOT NULL,
1384 record_key TEXT NOT NULL,
1385 op_kind TEXT NOT NULL,
1386 payload_json TEXT NOT NULL,
1387 source_ref TEXT,
1388 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1389 mutation_order INTEGER NOT NULL DEFAULT 1,
1390 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1391 );
1392
1393 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1394 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1395 "#,
1396 )
1397 .expect("seed recovered operational schema");
1398
1399 let manager = SchemaManager::new();
1400 let report = manager
1401 .bootstrap(&conn)
1402 .expect("bootstrap recovered schema");
1403
1404 assert!(
1405 report
1406 .applied_versions
1407 .iter()
1408 .any(|version| version.0 == 10),
1409 "bootstrap should record operational filtered read migration"
1410 );
1411 assert!(
1412 report
1413 .applied_versions
1414 .iter()
1415 .any(|version| version.0 == 11),
1416 "bootstrap should record operational validation migration"
1417 );
1418 let filter_fields_json: String = conn
1419 .query_row(
1420 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1421 [],
1422 |row| row.get(0),
1423 )
1424 .expect("filter_fields_json added");
1425 assert_eq!(filter_fields_json, "[]");
1426 let validation_json: String = conn
1427 .query_row(
1428 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1429 [],
1430 |row| row.get(0),
1431 )
1432 .expect("validation_json added");
1433 assert_eq!(validation_json, "");
1434 let table_count: i64 = conn
1435 .query_row(
1436 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1437 [],
1438 |row| row.get(0),
1439 )
1440 .expect("filter table exists");
1441 assert_eq!(table_count, 1);
1442 }
1443
1444 #[test]
1445 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1446 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1447 let manager = SchemaManager::new();
1448 manager.bootstrap(&conn).expect("initial bootstrap");
1449
1450 conn.execute("DROP TABLE fathom_schema_migrations", [])
1451 .expect("drop migration history");
1452 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1453
1454 let report = manager
1455 .bootstrap(&conn)
1456 .expect("rebootstrap existing schema");
1457
1458 assert!(
1459 report
1460 .applied_versions
1461 .iter()
1462 .any(|version| version.0 == 10),
1463 "rebootstrap should re-record migration 10"
1464 );
1465 assert!(
1466 report
1467 .applied_versions
1468 .iter()
1469 .any(|version| version.0 == 11),
1470 "rebootstrap should re-record migration 11"
1471 );
1472 let filter_fields_json: String = conn
1473 .query_row(
1474 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1475 [],
1476 |row| row.get(0),
1477 )
1478 .unwrap_or_else(|_| "[]".to_string());
1479 assert_eq!(filter_fields_json, "[]");
1480 let validation_json: String = conn
1481 .query_row(
1482 "SELECT validation_json FROM operational_collections LIMIT 1",
1483 [],
1484 |row| row.get(0),
1485 )
1486 .unwrap_or_default();
1487 assert_eq!(validation_json, "");
1488 }
1489
1490 #[test]
1491 fn downgrade_detected_returns_version_mismatch() {
1492 use crate::SchemaError;
1493
1494 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1495 let manager = SchemaManager::new();
1496 manager.bootstrap(&conn).expect("initial bootstrap");
1497
1498 conn.execute(
1499 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1500 (999_i64, "future migration"),
1501 )
1502 .expect("insert future version");
1503
1504 let err = manager
1505 .bootstrap(&conn)
1506 .expect_err("should fail on downgrade");
1507 assert!(
1508 matches!(
1509 err,
1510 SchemaError::VersionMismatch {
1511 database_version: 999,
1512 ..
1513 }
1514 ),
1515 "expected VersionMismatch with database_version 999, got: {err}"
1516 );
1517 }
1518
1519 #[test]
1520 fn journal_size_limit_is_set() {
1521 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1522 let manager = SchemaManager::new();
1523 manager
1524 .initialize_connection(&conn)
1525 .expect("initialize_connection");
1526
1527 let limit: i64 = conn
1528 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1529 .expect("journal_size_limit pragma");
1530 assert_eq!(limit, 536_870_912);
1531 }
1532
1533 #[cfg(feature = "sqlite-vec")]
1534 #[test]
1535 fn vector_profile_created_when_feature_enabled() {
1536 unsafe {
1539 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1540 sqlite_vec::sqlite3_vec_init as *const (),
1541 )));
1542 }
1543 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1544 let manager = SchemaManager::new();
1545 manager.bootstrap(&conn).expect("bootstrap");
1546
1547 manager
1548 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1549 .expect("ensure_vector_profile");
1550
1551 let count: i64 = conn
1552 .query_row(
1553 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1554 [],
1555 |row| row.get(0),
1556 )
1557 .expect("count");
1558 assert_eq!(
1559 count, 1,
1560 "vector profile must be enabled after ensure_vector_profile"
1561 );
1562
1563 let _: i64 = conn
1565 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1566 row.get(0)
1567 })
1568 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1569 }
1570}