1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7 Migration::new(
8 SchemaVersion(1),
9 "initial canonical schema and runtime tables",
10 r"
11 CREATE TABLE IF NOT EXISTS nodes (
12 row_id TEXT PRIMARY KEY,
13 logical_id TEXT NOT NULL,
14 kind TEXT NOT NULL,
15 properties BLOB NOT NULL,
16 created_at INTEGER NOT NULL,
17 superseded_at INTEGER,
18 source_ref TEXT,
19 confidence REAL
20 );
21
22 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23 ON nodes(logical_id)
24 WHERE superseded_at IS NULL;
25 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26 ON nodes(kind, superseded_at);
27 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28 ON nodes(source_ref);
29
30 CREATE TABLE IF NOT EXISTS edges (
31 row_id TEXT PRIMARY KEY,
32 logical_id TEXT NOT NULL,
33 source_logical_id TEXT NOT NULL,
34 target_logical_id TEXT NOT NULL,
35 kind TEXT NOT NULL,
36 properties BLOB NOT NULL,
37 created_at INTEGER NOT NULL,
38 superseded_at INTEGER,
39 source_ref TEXT,
40 confidence REAL
41 );
42
43 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44 ON edges(logical_id)
45 WHERE superseded_at IS NULL;
46 CREATE INDEX IF NOT EXISTS idx_edges_source_active
47 ON edges(source_logical_id, kind, superseded_at);
48 CREATE INDEX IF NOT EXISTS idx_edges_target_active
49 ON edges(target_logical_id, kind, superseded_at);
50 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51 ON edges(source_ref);
52
53 CREATE TABLE IF NOT EXISTS chunks (
54 id TEXT PRIMARY KEY,
55 node_logical_id TEXT NOT NULL,
56 text_content TEXT NOT NULL,
57 byte_start INTEGER,
58 byte_end INTEGER,
59 created_at INTEGER NOT NULL
60 );
61
62 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63 ON chunks(node_logical_id);
64
65 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66 chunk_id UNINDEXED,
67 node_logical_id UNINDEXED,
68 kind UNINDEXED,
69 text_content
70 );
71
72 CREATE TABLE IF NOT EXISTS vector_profiles (
73 profile TEXT PRIMARY KEY,
74 table_name TEXT NOT NULL,
75 dimension INTEGER NOT NULL,
76 enabled INTEGER NOT NULL DEFAULT 0
77 );
78
79 CREATE TABLE IF NOT EXISTS runs (
80 id TEXT PRIMARY KEY,
81 kind TEXT NOT NULL,
82 status TEXT NOT NULL,
83 properties BLOB NOT NULL,
84 created_at INTEGER NOT NULL,
85 completed_at INTEGER,
86 superseded_at INTEGER,
87 source_ref TEXT
88 );
89
90 CREATE TABLE IF NOT EXISTS steps (
91 id TEXT PRIMARY KEY,
92 run_id TEXT NOT NULL,
93 kind TEXT NOT NULL,
94 status TEXT NOT NULL,
95 properties BLOB NOT NULL,
96 created_at INTEGER NOT NULL,
97 completed_at INTEGER,
98 superseded_at INTEGER,
99 source_ref TEXT,
100 FOREIGN KEY(run_id) REFERENCES runs(id)
101 );
102
103 CREATE TABLE IF NOT EXISTS actions (
104 id TEXT PRIMARY KEY,
105 step_id TEXT NOT NULL,
106 kind TEXT NOT NULL,
107 status TEXT NOT NULL,
108 properties BLOB NOT NULL,
109 created_at INTEGER NOT NULL,
110 completed_at INTEGER,
111 superseded_at INTEGER,
112 source_ref TEXT,
113 FOREIGN KEY(step_id) REFERENCES steps(id)
114 );
115
116 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117 ON runs(source_ref);
118 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119 ON steps(source_ref);
120 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121 ON actions(source_ref);
122 ",
123 ),
124 Migration::new(
125 SchemaVersion(2),
126 "durable audit trail: provenance_events table",
127 r"
128 CREATE TABLE IF NOT EXISTS provenance_events (
129 id TEXT PRIMARY KEY,
130 event_type TEXT NOT NULL,
131 subject TEXT NOT NULL,
132 source_ref TEXT,
133 created_at INTEGER NOT NULL DEFAULT (unixepoch())
134 );
135 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136 ON provenance_events (subject, event_type);
137 ",
138 ),
139 Migration::new(
140 SchemaVersion(3),
141 "vector regeneration contracts",
142 r"
143 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144 profile TEXT PRIMARY KEY,
145 table_name TEXT NOT NULL,
146 model_identity TEXT NOT NULL,
147 model_version TEXT NOT NULL,
148 dimension INTEGER NOT NULL,
149 normalization_policy TEXT NOT NULL,
150 chunking_policy TEXT NOT NULL,
151 preprocessing_policy TEXT NOT NULL,
152 generator_command_json TEXT NOT NULL,
153 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
154 );
155 ",
156 ),
157 Migration::new(
158 SchemaVersion(4),
159 "vector regeneration apply metadata",
160 r"
161 ALTER TABLE vector_embedding_contracts
162 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163 ALTER TABLE vector_embedding_contracts
164 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165 UPDATE vector_embedding_contracts
166 SET
167 applied_at = CASE
168 WHEN applied_at = 0 THEN updated_at
169 ELSE applied_at
170 END,
171 snapshot_hash = CASE
172 WHEN snapshot_hash = '' THEN 'legacy'
173 ELSE snapshot_hash
174 END;
175 ",
176 ),
177 Migration::new(
178 SchemaVersion(5),
179 "vector regeneration contract format version",
180 r"
181 ALTER TABLE vector_embedding_contracts
182 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183 UPDATE vector_embedding_contracts
184 SET contract_format_version = 1
185 WHERE contract_format_version = 0;
186 ",
187 ),
188 Migration::new(
189 SchemaVersion(6),
190 "provenance metadata payloads",
191 r"
192 ALTER TABLE provenance_events
193 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194 ",
195 ),
196 Migration::new(
197 SchemaVersion(7),
198 "operational store canonical and derived tables",
199 r"
200 CREATE TABLE IF NOT EXISTS operational_collections (
201 name TEXT PRIMARY KEY,
202 kind TEXT NOT NULL,
203 schema_json TEXT NOT NULL,
204 retention_json TEXT NOT NULL,
205 format_version INTEGER NOT NULL DEFAULT 1,
206 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
207 disabled_at INTEGER
208 );
209
210 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211 ON operational_collections(kind, disabled_at);
212
213 CREATE TABLE IF NOT EXISTS operational_mutations (
214 id TEXT PRIMARY KEY,
215 collection_name TEXT NOT NULL,
216 record_key TEXT NOT NULL,
217 op_kind TEXT NOT NULL,
218 payload_json TEXT NOT NULL,
219 source_ref TEXT,
220 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
221 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222 );
223
224 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227 ON operational_mutations(source_ref);
228
229 CREATE TABLE IF NOT EXISTS operational_current (
230 collection_name TEXT NOT NULL,
231 record_key TEXT NOT NULL,
232 payload_json TEXT NOT NULL,
233 updated_at INTEGER NOT NULL,
234 last_mutation_id TEXT NOT NULL,
235 PRIMARY KEY(collection_name, record_key),
236 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238 );
239
240 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241 ON operational_current(collection_name, updated_at DESC);
242 ",
243 ),
244 Migration::new(
245 SchemaVersion(8),
246 "operational mutation ordering hardening",
247 r"
248 ALTER TABLE operational_mutations
249 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250 UPDATE operational_mutations
251 SET mutation_order = rowid
252 WHERE mutation_order = 0;
253 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254 ON operational_mutations(collection_name, record_key, mutation_order DESC);
255 ",
256 ),
257 Migration::new(
258 SchemaVersion(9),
259 "node last_accessed metadata",
260 r"
261 CREATE TABLE IF NOT EXISTS node_access_metadata (
262 logical_id TEXT PRIMARY KEY,
263 last_accessed_at INTEGER NOT NULL,
264 updated_at INTEGER NOT NULL
265 );
266
267 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268 ON node_access_metadata(last_accessed_at DESC);
269 ",
270 ),
271 Migration::new(
272 SchemaVersion(10),
273 "operational filtered read contracts and extracted values",
274 r"
275 ALTER TABLE operational_collections
276 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278 CREATE TABLE IF NOT EXISTS operational_filter_values (
279 mutation_id TEXT NOT NULL,
280 collection_name TEXT NOT NULL,
281 field_name TEXT NOT NULL,
282 string_value TEXT,
283 integer_value INTEGER,
284 PRIMARY KEY(mutation_id, field_name),
285 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287 );
288
289 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293 ",
294 ),
295 Migration::new(
296 SchemaVersion(11),
297 "operational payload validation contracts",
298 r"
299 ALTER TABLE operational_collections
300 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301 ",
302 ),
303 Migration::new(
304 SchemaVersion(12),
305 "operational secondary indexes",
306 r"
307 ALTER TABLE operational_collections
308 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311 collection_name TEXT NOT NULL,
312 index_name TEXT NOT NULL,
313 subject_kind TEXT NOT NULL,
314 mutation_id TEXT NOT NULL DEFAULT '',
315 record_key TEXT NOT NULL DEFAULT '',
316 sort_timestamp INTEGER,
317 slot1_text TEXT,
318 slot1_integer INTEGER,
319 slot2_text TEXT,
320 slot2_integer INTEGER,
321 slot3_text TEXT,
322 slot3_integer INTEGER,
323 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326 );
327
328 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329 ON operational_secondary_index_entries(
330 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331 );
332 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333 ON operational_secondary_index_entries(
334 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335 );
336 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337 ON operational_secondary_index_entries(
338 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339 );
340 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341 ON operational_secondary_index_entries(
342 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343 );
344 ",
345 ),
346 Migration::new(
347 SchemaVersion(13),
348 "operational retention run metadata",
349 r"
350 CREATE TABLE IF NOT EXISTS operational_retention_runs (
351 id TEXT PRIMARY KEY,
352 collection_name TEXT NOT NULL,
353 executed_at INTEGER NOT NULL,
354 action_kind TEXT NOT NULL,
355 dry_run INTEGER NOT NULL DEFAULT 0,
356 deleted_mutations INTEGER NOT NULL,
357 rows_remaining INTEGER NOT NULL,
358 metadata_json TEXT NOT NULL DEFAULT '',
359 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360 );
361
362 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363 ON operational_retention_runs(collection_name, executed_at DESC);
364 ",
365 ),
366 Migration::new(
367 SchemaVersion(14),
368 "external content object columns",
369 r"
370 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373 ON nodes(content_ref)
374 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377 ",
378 ),
379 Migration::new(
380 SchemaVersion(15),
381 "FTS property projection schemas",
382 r"
383 CREATE TABLE IF NOT EXISTS fts_property_schemas (
384 kind TEXT PRIMARY KEY,
385 property_paths_json TEXT NOT NULL,
386 separator TEXT NOT NULL DEFAULT ' ',
387 format_version INTEGER NOT NULL DEFAULT 1,
388 created_at INTEGER NOT NULL DEFAULT (unixepoch())
389 );
390
391 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392 node_logical_id UNINDEXED,
393 kind UNINDEXED,
394 text_content
395 );
396 ",
397 ),
398 Migration::new(
399 SchemaVersion(16),
400 "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401 "",
407 ),
408 Migration::new(
409 SchemaVersion(17),
410 "fts property position-map sidecar for recursive extraction",
411 r"
426 CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427 node_logical_id TEXT NOT NULL,
428 kind TEXT NOT NULL,
429 start_offset INTEGER NOT NULL,
430 end_offset INTEGER NOT NULL,
431 leaf_path TEXT NOT NULL
432 );
433
434 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435 ON fts_node_property_positions(node_logical_id, kind);
436
437 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438 ON fts_node_property_positions(kind);
439 ",
440 ),
441 Migration::new(
442 SchemaVersion(18),
443 "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444 r"
457 DROP TABLE IF EXISTS fts_node_property_positions;
458
459 CREATE TABLE fts_node_property_positions (
460 node_logical_id TEXT NOT NULL,
461 kind TEXT NOT NULL,
462 start_offset INTEGER NOT NULL,
463 end_offset INTEGER NOT NULL,
464 leaf_path TEXT NOT NULL,
465 UNIQUE(node_logical_id, kind, start_offset)
466 );
467
468 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469 ON fts_node_property_positions(node_logical_id, kind);
470
471 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472 ON fts_node_property_positions(kind);
473 ",
474 ),
475 Migration::new(
476 SchemaVersion(19),
477 "async property-FTS rebuild staging and state tables",
478 r"
479 CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480 kind TEXT NOT NULL,
481 node_logical_id TEXT NOT NULL,
482 text_content TEXT NOT NULL,
483 positions_blob BLOB,
484 PRIMARY KEY (kind, node_logical_id)
485 );
486
487 CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488 kind TEXT PRIMARY KEY,
489 schema_id INTEGER NOT NULL,
490 state TEXT NOT NULL,
491 rows_total INTEGER,
492 rows_done INTEGER NOT NULL DEFAULT 0,
493 started_at INTEGER NOT NULL,
494 last_progress_at INTEGER,
495 error_message TEXT,
496 is_first_registration INTEGER NOT NULL DEFAULT 0
497 );
498 ",
499 ),
500 Migration::new(
501 SchemaVersion(20),
502 "projection_profiles table for per-kind FTS tokenizer configuration",
503 r"CREATE TABLE IF NOT EXISTS projection_profiles (
504 kind TEXT NOT NULL,
505 facet TEXT NOT NULL,
506 config_json TEXT NOT NULL,
507 active_at INTEGER,
508 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
509 PRIMARY KEY (kind, facet)
510 );",
511 ),
512 Migration::new(
513 SchemaVersion(21),
514 "per-kind FTS5 tables replacing fts_node_properties",
515 "",
516 ),
517 Migration::new(
518 SchemaVersion(22),
519 "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521 ),
522 Migration::new(
523 SchemaVersion(23),
524 "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525 "DROP TABLE IF EXISTS fts_node_properties;",
526 ),
527];
528
529#[derive(Clone, Debug, PartialEq, Eq)]
530pub struct BootstrapReport {
531 pub sqlite_version: String,
532 pub applied_versions: Vec<SchemaVersion>,
533 pub vector_profile_enabled: bool,
534}
535
536#[derive(Clone, Debug, Default)]
537pub struct SchemaManager;
538
539impl SchemaManager {
540 #[must_use]
541 pub fn new() -> Self {
542 Self
543 }
544
545 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
553 self.initialize_connection(conn)?;
554 Self::ensure_metadata_tables(conn)?;
555
556 let max_applied: u32 = conn.query_row(
558 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
559 [],
560 |row| row.get(0),
561 )?;
562 let engine_version = self.current_version().0;
563 trace_info!(
564 current_version = max_applied,
565 engine_version,
566 "schema bootstrap: version check"
567 );
568 if max_applied > engine_version {
569 trace_error!(
570 database_version = max_applied,
571 engine_version,
572 "schema version mismatch: database is newer than engine"
573 );
574 return Err(SchemaError::VersionMismatch {
575 database_version: max_applied,
576 engine_version,
577 });
578 }
579
580 let mut applied_versions = Vec::new();
581 for migration in self.migrations() {
582 let already_applied = conn
583 .query_row(
584 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
585 [i64::from(migration.version.0)],
586 |row| row.get::<_, i64>(0),
587 )
588 .optional()?
589 .is_some();
590
591 if already_applied {
592 continue;
593 }
594
595 let tx = conn.unchecked_transaction()?;
596 match migration.version {
597 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
598 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
599 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
600 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
601 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
602 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
603 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
604 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
605 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
606 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
607 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
608 SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
609 SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
610 SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
611 _ => tx.execute_batch(migration.sql)?,
612 }
613 tx.execute(
614 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
615 (i64::from(migration.version.0), migration.description),
616 )?;
617 tx.commit()?;
618 trace_info!(
619 version = migration.version.0,
620 description = migration.description,
621 "schema migration applied"
622 );
623 applied_versions.push(migration.version);
624 }
625
626 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
627 let vector_profile_count: i64 = conn.query_row(
628 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
629 [],
630 |row| row.get(0),
631 )?;
632 Ok(BootstrapReport {
633 sqlite_version,
634 applied_versions,
635 vector_profile_enabled: vector_profile_count > 0,
636 })
637 }
638
639 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
640 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
641 let columns = stmt
642 .query_map([], |row| row.get::<_, String>(1))?
643 .collect::<Result<Vec<_>, _>>()?;
644 let has_applied_at = columns.iter().any(|column| column == "applied_at");
645 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
646
647 if !has_applied_at {
648 conn.execute(
649 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
650 [],
651 )?;
652 }
653 if !has_snapshot_hash {
654 conn.execute(
655 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
656 [],
657 )?;
658 }
659 conn.execute(
660 r"
661 UPDATE vector_embedding_contracts
662 SET
663 applied_at = CASE
664 WHEN applied_at = 0 THEN updated_at
665 ELSE applied_at
666 END,
667 snapshot_hash = CASE
668 WHEN snapshot_hash = '' THEN 'legacy'
669 ELSE snapshot_hash
670 END
671 ",
672 [],
673 )?;
674 Ok(())
675 }
676
677 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
678 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
679 let columns = stmt
680 .query_map([], |row| row.get::<_, String>(1))?
681 .collect::<Result<Vec<_>, _>>()?;
682 let has_contract_format_version = columns
683 .iter()
684 .any(|column| column == "contract_format_version");
685
686 if !has_contract_format_version {
687 conn.execute(
688 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
689 [],
690 )?;
691 }
692 conn.execute(
693 r"
694 UPDATE vector_embedding_contracts
695 SET contract_format_version = 1
696 WHERE contract_format_version = 0
697 ",
698 [],
699 )?;
700 Ok(())
701 }
702
703 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
704 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
705 let columns = stmt
706 .query_map([], |row| row.get::<_, String>(1))?
707 .collect::<Result<Vec<_>, _>>()?;
708 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
709
710 if !has_metadata_json {
711 conn.execute(
712 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
713 [],
714 )?;
715 }
716 Ok(())
717 }
718
719 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
720 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
721 let columns = stmt
722 .query_map([], |row| row.get::<_, String>(1))?
723 .collect::<Result<Vec<_>, _>>()?;
724 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
725
726 if !has_mutation_order {
727 conn.execute(
728 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
729 [],
730 )?;
731 }
732 conn.execute(
733 r"
734 UPDATE operational_mutations
735 SET mutation_order = rowid
736 WHERE mutation_order = 0
737 ",
738 [],
739 )?;
740 conn.execute(
741 r"
742 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
743 ON operational_mutations(collection_name, record_key, mutation_order DESC)
744 ",
745 [],
746 )?;
747 Ok(())
748 }
749
750 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
751 conn.execute_batch(
752 r"
753 CREATE TABLE IF NOT EXISTS node_access_metadata (
754 logical_id TEXT PRIMARY KEY,
755 last_accessed_at INTEGER NOT NULL,
756 updated_at INTEGER NOT NULL
757 );
758
759 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
760 ON node_access_metadata(last_accessed_at DESC);
761 ",
762 )?;
763 Ok(())
764 }
765
766 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
767 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
768 let columns = stmt
769 .query_map([], |row| row.get::<_, String>(1))?
770 .collect::<Result<Vec<_>, _>>()?;
771 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
772
773 if !has_filter_fields_json {
774 conn.execute(
775 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
776 [],
777 )?;
778 }
779
780 conn.execute_batch(
781 r"
782 CREATE TABLE IF NOT EXISTS operational_filter_values (
783 mutation_id TEXT NOT NULL,
784 collection_name TEXT NOT NULL,
785 field_name TEXT NOT NULL,
786 string_value TEXT,
787 integer_value INTEGER,
788 PRIMARY KEY(mutation_id, field_name),
789 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
790 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
791 );
792
793 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
794 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
795 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
796 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
797 ",
798 )?;
799 Ok(())
800 }
801
802 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
803 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
804 let columns = stmt
805 .query_map([], |row| row.get::<_, String>(1))?
806 .collect::<Result<Vec<_>, _>>()?;
807 let has_validation_json = columns.iter().any(|column| column == "validation_json");
808
809 if !has_validation_json {
810 conn.execute(
811 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
812 [],
813 )?;
814 }
815
816 Ok(())
817 }
818
819 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
820 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
821 let columns = stmt
822 .query_map([], |row| row.get::<_, String>(1))?
823 .collect::<Result<Vec<_>, _>>()?;
824 let has_secondary_indexes_json = columns
825 .iter()
826 .any(|column| column == "secondary_indexes_json");
827
828 if !has_secondary_indexes_json {
829 conn.execute(
830 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
831 [],
832 )?;
833 }
834
835 conn.execute_batch(
836 r"
837 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
838 collection_name TEXT NOT NULL,
839 index_name TEXT NOT NULL,
840 subject_kind TEXT NOT NULL,
841 mutation_id TEXT NOT NULL DEFAULT '',
842 record_key TEXT NOT NULL DEFAULT '',
843 sort_timestamp INTEGER,
844 slot1_text TEXT,
845 slot1_integer INTEGER,
846 slot2_text TEXT,
847 slot2_integer INTEGER,
848 slot3_text TEXT,
849 slot3_integer INTEGER,
850 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
851 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
852 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
853 );
854
855 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
856 ON operational_secondary_index_entries(
857 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
858 );
859 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
860 ON operational_secondary_index_entries(
861 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
862 );
863 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
864 ON operational_secondary_index_entries(
865 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
866 );
867 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
868 ON operational_secondary_index_entries(
869 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
870 );
871 ",
872 )?;
873
874 Ok(())
875 }
876
877 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
878 conn.execute_batch(
879 r"
880 CREATE TABLE IF NOT EXISTS operational_retention_runs (
881 id TEXT PRIMARY KEY,
882 collection_name TEXT NOT NULL,
883 executed_at INTEGER NOT NULL,
884 action_kind TEXT NOT NULL,
885 dry_run INTEGER NOT NULL DEFAULT 0,
886 deleted_mutations INTEGER NOT NULL,
887 rows_remaining INTEGER NOT NULL,
888 metadata_json TEXT NOT NULL DEFAULT '',
889 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
890 );
891
892 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
893 ON operational_retention_runs(collection_name, executed_at DESC);
894 ",
895 )?;
896 Ok(())
897 }
898
899 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
900 let node_columns = Self::column_names(conn, "nodes")?;
901 if !node_columns.iter().any(|c| c == "content_ref") {
902 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
903 }
904 conn.execute_batch(
905 r"
906 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
907 ON nodes(content_ref)
908 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
909 ",
910 )?;
911
912 let chunk_columns = Self::column_names(conn, "chunks")?;
913 if !chunk_columns.iter().any(|c| c == "content_hash") {
914 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
915 }
916 Ok(())
917 }
918
919 fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
937 conn.execute_batch(
938 r"
939 DROP TABLE IF EXISTS fts_nodes;
940 CREATE VIRTUAL TABLE fts_nodes USING fts5(
941 chunk_id UNINDEXED,
942 node_logical_id UNINDEXED,
943 kind UNINDEXED,
944 text_content,
945 tokenize = 'porter unicode61 remove_diacritics 2'
946 );
947
948 DROP TABLE IF EXISTS fts_node_properties;
949 CREATE VIRTUAL TABLE fts_node_properties USING fts5(
950 node_logical_id UNINDEXED,
951 kind UNINDEXED,
952 text_content,
953 tokenize = 'porter unicode61 remove_diacritics 2'
954 );
955
956 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
957 SELECT c.id, n.logical_id, n.kind, c.text_content
958 FROM chunks c
959 JOIN nodes n
960 ON n.logical_id = c.node_logical_id
961 AND n.superseded_at IS NULL;
962 ",
963 )?;
964 Ok(())
965 }
966
967 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
968 conn.execute_batch(
969 r"
970 CREATE TABLE IF NOT EXISTS fts_property_schemas (
971 kind TEXT PRIMARY KEY,
972 property_paths_json TEXT NOT NULL,
973 separator TEXT NOT NULL DEFAULT ' ',
974 format_version INTEGER NOT NULL DEFAULT 1,
975 created_at INTEGER NOT NULL DEFAULT (unixepoch())
976 );
977
978 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
979 node_logical_id UNINDEXED,
980 kind UNINDEXED,
981 text_content
982 );
983 ",
984 )?;
985 Ok(())
986 }
987
988 fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
989 let kinds: Vec<String> = {
991 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
992 stmt.query_map([], |r| r.get::<_, String>(0))?
993 .collect::<Result<Vec<_>, _>>()?
994 };
995
996 for kind in &kinds {
997 let table_name = fts_kind_table_name(kind);
998 let ddl = format!(
999 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1000 node_logical_id UNINDEXED, \
1001 text_content, \
1002 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1003 )"
1004 );
1005 conn.execute_batch(&ddl)?;
1006
1007 conn.execute(
1009 "INSERT OR IGNORE INTO fts_property_rebuild_state \
1010 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1011 SELECT ?1, rowid, 'PENDING', 0, unixepoch('now') * 1000, 0 \
1012 FROM fts_property_schemas WHERE kind = ?1",
1013 rusqlite::params![kind],
1014 )?;
1015 }
1016
1017 Ok(())
1020 }
1021
1022 fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1023 let column_exists: bool = {
1025 let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1026 let names: Vec<String> = stmt
1027 .query_map([], |r| r.get::<_, String>(1))?
1028 .collect::<Result<Vec<_>, _>>()?;
1029 names.iter().any(|n| n == "columns_json")
1030 };
1031
1032 if !column_exists {
1033 conn.execute_batch(
1034 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1035 )?;
1036 }
1037
1038 Ok(())
1039 }
1040
1041 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1042 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1043 let names = stmt
1044 .query_map([], |row| row.get::<_, String>(1))?
1045 .collect::<Result<Vec<_>, _>>()?;
1046 Ok(names)
1047 }
1048
1049 #[must_use]
1050 pub fn current_version(&self) -> SchemaVersion {
1051 self.migrations()
1052 .last()
1053 .map_or(SchemaVersion(0), |migration| migration.version)
1054 }
1055
1056 #[must_use]
1057 pub fn migrations(&self) -> &'static [Migration] {
1058 MIGRATIONS
1059 }
1060
1061 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1067 conn.execute_batch(
1068 r"
1069 PRAGMA foreign_keys = ON;
1070 PRAGMA journal_mode = WAL;
1071 PRAGMA synchronous = NORMAL;
1072 PRAGMA busy_timeout = 5000;
1073 PRAGMA temp_store = MEMORY;
1074 PRAGMA mmap_size = 3000000000;
1075 PRAGMA journal_size_limit = 536870912;
1076 ",
1077 )?;
1078 Ok(())
1079 }
1080
1081 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1092 conn.execute_batch(
1093 r"
1094 PRAGMA foreign_keys = ON;
1095 PRAGMA busy_timeout = 5000;
1096 PRAGMA temp_store = MEMORY;
1097 PRAGMA mmap_size = 3000000000;
1098 ",
1099 )?;
1100 Ok(())
1101 }
1102
1103 #[cfg(feature = "sqlite-vec")]
1115 pub fn ensure_vector_profile(
1116 &self,
1117 conn: &Connection,
1118 profile: &str,
1119 table_name: &str,
1120 dimension: usize,
1121 ) -> Result<(), SchemaError> {
1122 conn.execute_batch(&format!(
1123 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1124 chunk_id TEXT PRIMARY KEY,\
1125 embedding float[{dimension}]\
1126 )"
1127 ))?;
1128 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1131 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1132 format!("vector dimension {dimension} does not fit in i64").into(),
1133 ))
1134 })?;
1135 conn.execute(
1136 "INSERT OR REPLACE INTO vector_profiles \
1137 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1138 rusqlite::params![profile, table_name, dimension_i64],
1139 )?;
1140 Ok(())
1141 }
1142
1143 #[cfg(not(feature = "sqlite-vec"))]
1148 pub fn ensure_vector_profile(
1149 &self,
1150 _conn: &Connection,
1151 _profile: &str,
1152 _table_name: &str,
1153 _dimension: usize,
1154 ) -> Result<(), SchemaError> {
1155 Err(SchemaError::MissingCapability("sqlite-vec"))
1156 }
1157
1158 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1164 conn.execute_batch(
1165 r"
1166 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1167 version INTEGER PRIMARY KEY,
1168 description TEXT NOT NULL,
1169 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1170 );
1171 ",
1172 )?;
1173 Ok(())
1174 }
1175}
1176
1177pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1179
1180#[must_use]
1190pub fn fts_kind_table_name(kind: &str) -> String {
1191 let lowered = kind.to_lowercase();
1193 let mut slug = String::with_capacity(lowered.len());
1194 let mut prev_was_underscore = false;
1195 for ch in lowered.chars() {
1196 if ch.is_ascii_alphanumeric() {
1197 slug.push(ch);
1198 prev_was_underscore = false;
1199 } else {
1200 if !prev_was_underscore {
1201 slug.push('_');
1202 }
1203 prev_was_underscore = true;
1204 }
1205 }
1206
1207 let prefixed = format!("fts_props_{slug}");
1209
1210 if prefixed.len() <= 63 {
1212 prefixed
1213 } else {
1214 let hash = Sha256::digest(kind.as_bytes());
1216 let mut hex = String::with_capacity(hash.len() * 2);
1217 for b in &hash {
1218 use std::fmt::Write as _;
1219 let _ = write!(hex, "{b:02x}");
1220 }
1221 let hex_suffix = &hex[..7];
1222 let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1225 format!("fts_props_{slug_truncated}_{hex_suffix}")
1226 }
1227}
1228
1229#[must_use]
1237pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1238 let stripped = if let Some(rest) = path.strip_prefix("$.") {
1240 rest
1241 } else if let Some(rest) = path.strip_prefix('$') {
1242 rest
1243 } else {
1244 path
1245 };
1246
1247 let lowered = stripped.to_lowercase();
1249 let mut col = String::with_capacity(lowered.len());
1250 let mut prev_was_underscore = false;
1251 for ch in lowered.chars() {
1252 if ch.is_ascii_alphanumeric() || ch == '_' {
1253 col.push(ch);
1254 prev_was_underscore = ch == '_';
1255 } else {
1256 if !prev_was_underscore {
1257 col.push('_');
1258 }
1259 prev_was_underscore = true;
1260 }
1261 }
1262
1263 let col = col.trim_end_matches('_').to_owned();
1265
1266 if is_recursive {
1268 format!("{col}_all")
1269 } else {
1270 col
1271 }
1272}
1273
1274pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1285 let result = conn
1286 .query_row(
1287 "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1288 [kind],
1289 |row| row.get::<_, Option<String>>(0),
1290 )
1291 .optional();
1292
1293 match result {
1294 Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1295 Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1296 Err(rusqlite::Error::SqliteFailure(_, _)) => {
1297 Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1299 }
1300 Err(e) => Err(SchemaError::Sqlite(e)),
1301 }
1302}
1303
1304#[cfg(test)]
1305#[allow(clippy::expect_used)]
1306mod tests {
1307 use rusqlite::Connection;
1308
1309 use super::SchemaManager;
1310
1311 #[test]
1312 fn bootstrap_applies_initial_schema() {
1313 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1314 let manager = SchemaManager::new();
1315
1316 let report = manager.bootstrap(&conn).expect("bootstrap report");
1317
1318 assert_eq!(
1319 report.applied_versions.len(),
1320 manager.current_version().0 as usize
1321 );
1322 assert!(report.sqlite_version.starts_with('3'));
1323 let table_count: i64 = conn
1324 .query_row(
1325 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1326 [],
1327 |row| row.get(0),
1328 )
1329 .expect("nodes table exists");
1330 assert_eq!(table_count, 1);
1331 }
1332
1333 #[test]
1336 fn vector_profile_not_enabled_without_feature() {
1337 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1338 let manager = SchemaManager::new();
1339 let report = manager.bootstrap(&conn).expect("bootstrap");
1340 assert!(
1341 !report.vector_profile_enabled,
1342 "vector_profile_enabled must be false on a fresh bootstrap"
1343 );
1344 }
1345
1346 #[test]
1347 fn vector_profile_skipped_when_dimension_absent() {
1348 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1350 let manager = SchemaManager::new();
1351 manager.bootstrap(&conn).expect("bootstrap");
1352
1353 let count: i64 = conn
1354 .query_row(
1355 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1356 [],
1357 |row| row.get(0),
1358 )
1359 .expect("count");
1360 assert_eq!(
1361 count, 0,
1362 "no enabled profile without calling ensure_vector_profile"
1363 );
1364 }
1365
1366 #[test]
1367 fn bootstrap_report_reflects_actual_vector_state() {
1368 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1370 let manager = SchemaManager::new();
1371 let report = manager.bootstrap(&conn).expect("bootstrap");
1372
1373 let db_count: i64 = conn
1374 .query_row(
1375 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1376 [],
1377 |row| row.get(0),
1378 )
1379 .expect("count");
1380 assert_eq!(
1381 report.vector_profile_enabled,
1382 db_count > 0,
1383 "BootstrapReport.vector_profile_enabled must match actual DB state"
1384 );
1385 }
1386
1387 #[test]
1388 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1389 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1390 conn.execute_batch(
1391 r#"
1392 CREATE TABLE provenance_events (
1393 id TEXT PRIMARY KEY,
1394 event_type TEXT NOT NULL,
1395 subject TEXT NOT NULL,
1396 source_ref TEXT,
1397 created_at INTEGER NOT NULL DEFAULT (unixepoch())
1398 );
1399 CREATE TABLE vector_embedding_contracts (
1400 profile TEXT PRIMARY KEY,
1401 table_name TEXT NOT NULL,
1402 model_identity TEXT NOT NULL,
1403 model_version TEXT NOT NULL,
1404 dimension INTEGER NOT NULL,
1405 normalization_policy TEXT NOT NULL,
1406 chunking_policy TEXT NOT NULL,
1407 preprocessing_policy TEXT NOT NULL,
1408 generator_command_json TEXT NOT NULL,
1409 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1410 applied_at INTEGER NOT NULL DEFAULT 0,
1411 snapshot_hash TEXT NOT NULL DEFAULT ''
1412 );
1413 INSERT INTO vector_embedding_contracts (
1414 profile,
1415 table_name,
1416 model_identity,
1417 model_version,
1418 dimension,
1419 normalization_policy,
1420 chunking_policy,
1421 preprocessing_policy,
1422 generator_command_json,
1423 updated_at,
1424 applied_at,
1425 snapshot_hash
1426 ) VALUES (
1427 'default',
1428 'vec_nodes_active',
1429 'legacy-model',
1430 '0.9.0',
1431 4,
1432 'l2',
1433 'per_chunk',
1434 'trim',
1435 '["/bin/echo"]',
1436 100,
1437 100,
1438 'legacy'
1439 );
1440 "#,
1441 )
1442 .expect("seed legacy schema");
1443 let manager = SchemaManager::new();
1444
1445 let report = manager.bootstrap(&conn).expect("bootstrap");
1446
1447 assert!(
1448 report.applied_versions.iter().any(|version| version.0 >= 5),
1449 "bootstrap should apply hardening migrations"
1450 );
1451 let format_version: i64 = conn
1452 .query_row(
1453 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1454 [],
1455 |row| row.get(0),
1456 )
1457 .expect("contract_format_version");
1458 assert_eq!(format_version, 1);
1459 let metadata_column_count: i64 = conn
1460 .query_row(
1461 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1462 [],
1463 |row| row.get(0),
1464 )
1465 .expect("metadata_json column count");
1466 assert_eq!(metadata_column_count, 1);
1467 }
1468
1469 #[test]
1470 fn bootstrap_creates_operational_store_tables() {
1471 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1472 let manager = SchemaManager::new();
1473
1474 manager.bootstrap(&conn).expect("bootstrap");
1475
1476 for table in [
1477 "operational_collections",
1478 "operational_mutations",
1479 "operational_current",
1480 ] {
1481 let count: i64 = conn
1482 .query_row(
1483 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1484 [table],
1485 |row| row.get(0),
1486 )
1487 .expect("table existence");
1488 assert_eq!(count, 1, "{table} should exist after bootstrap");
1489 }
1490 let mutation_order_columns: i64 = conn
1491 .query_row(
1492 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1493 [],
1494 |row| row.get(0),
1495 )
1496 .expect("mutation_order column exists");
1497 assert_eq!(mutation_order_columns, 1);
1498 }
1499
1500 #[test]
1501 fn bootstrap_is_idempotent_with_operational_store_tables() {
1502 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1503 let manager = SchemaManager::new();
1504
1505 manager.bootstrap(&conn).expect("first bootstrap");
1506 let report = manager.bootstrap(&conn).expect("second bootstrap");
1507
1508 assert!(
1509 report.applied_versions.is_empty(),
1510 "second bootstrap should apply no new migrations"
1511 );
1512 let count: i64 = conn
1513 .query_row(
1514 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1515 [],
1516 |row| row.get(0),
1517 )
1518 .expect("operational_collections table exists");
1519 assert_eq!(count, 1);
1520 }
1521
1522 #[test]
1523 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1524 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1525 conn.execute_batch(
1526 r#"
1527 CREATE TABLE operational_collections (
1528 name TEXT PRIMARY KEY,
1529 kind TEXT NOT NULL,
1530 schema_json TEXT NOT NULL,
1531 retention_json TEXT NOT NULL,
1532 format_version INTEGER NOT NULL DEFAULT 1,
1533 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1534 disabled_at INTEGER
1535 );
1536
1537 CREATE TABLE operational_mutations (
1538 id TEXT PRIMARY KEY,
1539 collection_name TEXT NOT NULL,
1540 record_key TEXT NOT NULL,
1541 op_kind TEXT NOT NULL,
1542 payload_json TEXT NOT NULL,
1543 source_ref TEXT,
1544 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1545 mutation_order INTEGER NOT NULL DEFAULT 0,
1546 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1547 );
1548
1549 CREATE TABLE operational_current (
1550 collection_name TEXT NOT NULL,
1551 record_key TEXT NOT NULL,
1552 payload_json TEXT NOT NULL,
1553 updated_at INTEGER NOT NULL,
1554 last_mutation_id TEXT NOT NULL,
1555 PRIMARY KEY(collection_name, record_key),
1556 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1557 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1558 );
1559
1560 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1561 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1562 INSERT INTO operational_mutations (
1563 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1564 ) VALUES (
1565 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1566 );
1567 "#,
1568 )
1569 .expect("seed recovered operational tables");
1570
1571 let manager = SchemaManager::new();
1572 let report = manager
1573 .bootstrap(&conn)
1574 .expect("bootstrap recovered schema");
1575
1576 assert!(
1577 report.applied_versions.iter().any(|version| version.0 == 8),
1578 "bootstrap should record operational mutation ordering hardening"
1579 );
1580 let mutation_order: i64 = conn
1581 .query_row(
1582 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1583 [],
1584 |row| row.get(0),
1585 )
1586 .expect("mutation_order");
1587 assert_ne!(
1588 mutation_order, 0,
1589 "bootstrap should backfill recovered operational rows"
1590 );
1591 let count: i64 = conn
1592 .query_row(
1593 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1594 [],
1595 |row| row.get(0),
1596 )
1597 .expect("ordering index exists");
1598 assert_eq!(count, 1);
1599 }
1600
1601 #[test]
1602 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1603 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1604 conn.execute_batch(
1605 r#"
1606 CREATE TABLE operational_collections (
1607 name TEXT PRIMARY KEY,
1608 kind TEXT NOT NULL,
1609 schema_json TEXT NOT NULL,
1610 retention_json TEXT NOT NULL,
1611 format_version INTEGER NOT NULL DEFAULT 1,
1612 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1613 disabled_at INTEGER
1614 );
1615
1616 CREATE TABLE operational_mutations (
1617 id TEXT PRIMARY KEY,
1618 collection_name TEXT NOT NULL,
1619 record_key TEXT NOT NULL,
1620 op_kind TEXT NOT NULL,
1621 payload_json TEXT NOT NULL,
1622 source_ref TEXT,
1623 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1624 mutation_order INTEGER NOT NULL DEFAULT 1,
1625 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1626 );
1627
1628 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1629 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1630 "#,
1631 )
1632 .expect("seed recovered operational schema");
1633
1634 let manager = SchemaManager::new();
1635 let report = manager
1636 .bootstrap(&conn)
1637 .expect("bootstrap recovered schema");
1638
1639 assert!(
1640 report
1641 .applied_versions
1642 .iter()
1643 .any(|version| version.0 == 10),
1644 "bootstrap should record operational filtered read migration"
1645 );
1646 assert!(
1647 report
1648 .applied_versions
1649 .iter()
1650 .any(|version| version.0 == 11),
1651 "bootstrap should record operational validation migration"
1652 );
1653 let filter_fields_json: String = conn
1654 .query_row(
1655 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1656 [],
1657 |row| row.get(0),
1658 )
1659 .expect("filter_fields_json added");
1660 assert_eq!(filter_fields_json, "[]");
1661 let validation_json: String = conn
1662 .query_row(
1663 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1664 [],
1665 |row| row.get(0),
1666 )
1667 .expect("validation_json added");
1668 assert_eq!(validation_json, "");
1669 let table_count: i64 = conn
1670 .query_row(
1671 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1672 [],
1673 |row| row.get(0),
1674 )
1675 .expect("filter table exists");
1676 assert_eq!(table_count, 1);
1677 }
1678
1679 #[test]
1680 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1681 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1682 let manager = SchemaManager::new();
1683 manager.bootstrap(&conn).expect("initial bootstrap");
1684
1685 conn.execute("DROP TABLE fathom_schema_migrations", [])
1686 .expect("drop migration history");
1687 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1688
1689 let report = manager
1690 .bootstrap(&conn)
1691 .expect("rebootstrap existing schema");
1692
1693 assert!(
1694 report
1695 .applied_versions
1696 .iter()
1697 .any(|version| version.0 == 10),
1698 "rebootstrap should re-record migration 10"
1699 );
1700 assert!(
1701 report
1702 .applied_versions
1703 .iter()
1704 .any(|version| version.0 == 11),
1705 "rebootstrap should re-record migration 11"
1706 );
1707 let filter_fields_json: String = conn
1708 .query_row(
1709 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1710 [],
1711 |row| row.get(0),
1712 )
1713 .unwrap_or_else(|_| "[]".to_string());
1714 assert_eq!(filter_fields_json, "[]");
1715 let validation_json: String = conn
1716 .query_row(
1717 "SELECT validation_json FROM operational_collections LIMIT 1",
1718 [],
1719 |row| row.get(0),
1720 )
1721 .unwrap_or_default();
1722 assert_eq!(validation_json, "");
1723 }
1724
1725 #[test]
1726 fn downgrade_detected_returns_version_mismatch() {
1727 use crate::SchemaError;
1728
1729 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1730 let manager = SchemaManager::new();
1731 manager.bootstrap(&conn).expect("initial bootstrap");
1732
1733 conn.execute(
1734 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1735 (999_i64, "future migration"),
1736 )
1737 .expect("insert future version");
1738
1739 let err = manager
1740 .bootstrap(&conn)
1741 .expect_err("should fail on downgrade");
1742 assert!(
1743 matches!(
1744 err,
1745 SchemaError::VersionMismatch {
1746 database_version: 999,
1747 ..
1748 }
1749 ),
1750 "expected VersionMismatch with database_version 999, got: {err}"
1751 );
1752 }
1753
1754 #[test]
1755 fn journal_size_limit_is_set() {
1756 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1757 let manager = SchemaManager::new();
1758 manager
1759 .initialize_connection(&conn)
1760 .expect("initialize_connection");
1761
1762 let limit: i64 = conn
1763 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1764 .expect("journal_size_limit pragma");
1765 assert_eq!(limit, 536_870_912);
1766 }
1767
1768 #[cfg(feature = "sqlite-vec")]
1769 #[test]
1770 fn vector_profile_created_when_feature_enabled() {
1771 unsafe {
1774 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1775 sqlite_vec::sqlite3_vec_init as *const (),
1776 )));
1777 }
1778 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1779 let manager = SchemaManager::new();
1780 manager.bootstrap(&conn).expect("bootstrap");
1781
1782 manager
1783 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1784 .expect("ensure_vector_profile");
1785
1786 let count: i64 = conn
1787 .query_row(
1788 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1789 [],
1790 |row| row.get(0),
1791 )
1792 .expect("count");
1793 assert_eq!(
1794 count, 1,
1795 "vector profile must be enabled after ensure_vector_profile"
1796 );
1797
1798 let _: i64 = conn
1800 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1801 row.get(0)
1802 })
1803 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1804 }
1805
1806 #[test]
1809 fn fts_kind_table_name_simple_kind() {
1810 assert_eq!(
1811 super::fts_kind_table_name("WMKnowledgeObject"),
1812 "fts_props_wmknowledgeobject"
1813 );
1814 }
1815
1816 #[test]
1817 fn fts_kind_table_name_another_simple_kind() {
1818 assert_eq!(
1819 super::fts_kind_table_name("WMExecutionRecord"),
1820 "fts_props_wmexecutionrecord"
1821 );
1822 }
1823
1824 #[test]
1825 fn fts_kind_table_name_with_separator_chars() {
1826 assert_eq!(
1827 super::fts_kind_table_name("MyKind-With.Dots"),
1828 "fts_props_mykind_with_dots"
1829 );
1830 }
1831
1832 #[test]
1833 fn fts_kind_table_name_collapses_consecutive_underscores() {
1834 assert_eq!(
1835 super::fts_kind_table_name("Kind__Double__Underscores"),
1836 "fts_props_kind_double_underscores"
1837 );
1838 }
1839
1840 #[test]
1841 fn fts_kind_table_name_long_kind_truncates_with_hash() {
1842 let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
1844 let result = super::fts_kind_table_name(long_kind);
1845 assert_eq!(result.len(), 63, "result must be exactly 63 chars");
1846 assert!(
1847 result.starts_with("fts_props_"),
1848 "result must start with fts_props_"
1849 );
1850 let last_underscore = result.rfind('_').expect("must contain underscore");
1852 let hex_suffix = &result[last_underscore + 1..];
1853 assert_eq!(hex_suffix.len(), 7, "hex suffix must be 7 chars");
1854 assert!(
1855 hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
1856 "hex suffix must be hex digits"
1857 );
1858 }
1859
1860 #[test]
1861 fn fts_kind_table_name_testkind() {
1862 assert_eq!(super::fts_kind_table_name("TestKind"), "fts_props_testkind");
1863 }
1864
1865 #[test]
1868 fn fts_column_name_simple_field() {
1869 assert_eq!(super::fts_column_name("$.title", false), "title");
1870 }
1871
1872 #[test]
1873 fn fts_column_name_nested_path() {
1874 assert_eq!(
1875 super::fts_column_name("$.payload.content", false),
1876 "payload_content"
1877 );
1878 }
1879
1880 #[test]
1881 fn fts_column_name_recursive() {
1882 assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
1883 }
1884
1885 #[test]
1886 fn fts_column_name_special_chars() {
1887 assert_eq!(
1888 super::fts_column_name("$.some-field[0]", false),
1889 "some_field_0"
1890 );
1891 }
1892
1893 #[test]
1896 fn resolve_fts_tokenizer_returns_default_when_no_table() {
1897 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1898 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1900 assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
1901 }
1902
1903 #[test]
1904 fn resolve_fts_tokenizer_returns_configured_value() {
1905 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1906 conn.execute_batch(
1907 "CREATE TABLE projection_profiles (
1908 kind TEXT NOT NULL,
1909 facet TEXT NOT NULL,
1910 config_json TEXT NOT NULL,
1911 active_at INTEGER,
1912 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1913 PRIMARY KEY (kind, facet)
1914 );
1915 INSERT INTO projection_profiles (kind, facet, config_json)
1916 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1917 )
1918 .expect("setup table");
1919
1920 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1921 assert_eq!(result, "trigram");
1922
1923 let default_result =
1924 super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
1925 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
1926 }
1927
1928 #[test]
1931 fn migration_20_creates_projection_profiles_table() {
1932 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1933 let manager = SchemaManager::new();
1934 manager.bootstrap(&conn).expect("bootstrap");
1935
1936 let table_exists: i64 = conn
1937 .query_row(
1938 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
1939 [],
1940 |row| row.get(0),
1941 )
1942 .expect("query sqlite_master");
1943 assert_eq!(table_exists, 1, "projection_profiles table must exist");
1944
1945 conn.execute_batch(
1947 "INSERT INTO projection_profiles (kind, facet, config_json)
1948 VALUES ('TestKind', 'fts', '{}')",
1949 )
1950 .expect("insert row to verify columns");
1951 let (kind, facet, config_json): (String, String, String) = conn
1952 .query_row(
1953 "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
1954 [],
1955 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1956 )
1957 .expect("select columns");
1958 assert_eq!(kind, "TestKind");
1959 assert_eq!(facet, "fts");
1960 assert_eq!(config_json, "{}");
1961 }
1962
1963 #[test]
1964 fn migration_20_primary_key_is_kind_facet() {
1965 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1966 let manager = SchemaManager::new();
1967 manager.bootstrap(&conn).expect("bootstrap");
1968
1969 conn.execute_batch(
1970 "INSERT INTO projection_profiles (kind, facet, config_json)
1971 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
1972 )
1973 .expect("first insert");
1974
1975 let result = conn.execute_batch(
1977 "INSERT INTO projection_profiles (kind, facet, config_json)
1978 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1979 );
1980 assert!(
1981 result.is_err(),
1982 "duplicate (kind, facet) must violate PRIMARY KEY"
1983 );
1984 }
1985
1986 #[test]
1987 fn migration_20_resolve_fts_tokenizer_end_to_end() {
1988 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1989 let manager = SchemaManager::new();
1990 manager.bootstrap(&conn).expect("bootstrap");
1991
1992 conn.execute_batch(
1993 "INSERT INTO projection_profiles (kind, facet, config_json)
1994 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1995 )
1996 .expect("insert profile");
1997
1998 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1999 assert_eq!(result, "trigram");
2000
2001 let default_result =
2002 super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2003 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2004 }
2005
2006 #[test]
2007 fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2008 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2009 let manager = SchemaManager::new();
2010
2011 manager.bootstrap(&conn).expect("first bootstrap");
2019
2020 conn.execute_batch(
2021 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2022 VALUES ('TestKind', '[]', ',', 1)",
2023 )
2024 .expect("insert kind");
2025
2026 SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2029
2030 let count: i64 = conn
2032 .query_row(
2033 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='fts_props_testkind'",
2034 [],
2035 |r| r.get(0),
2036 )
2037 .expect("count fts table");
2038 assert_eq!(
2039 count, 1,
2040 "fts_props_testkind virtual table should be created"
2041 );
2042
2043 let state: String = conn
2045 .query_row(
2046 "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2047 [],
2048 |r| r.get(0),
2049 )
2050 .expect("rebuild state row");
2051 assert_eq!(state, "PENDING");
2052 }
2053
2054 #[test]
2055 fn migration_22_adds_columns_json_to_staging_table() {
2056 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2057 let manager = SchemaManager::new();
2058 manager.bootstrap(&conn).expect("bootstrap");
2059
2060 let col_count: i64 = conn
2061 .query_row(
2062 "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2063 [],
2064 |r| r.get(0),
2065 )
2066 .expect("pragma_table_info");
2067 assert_eq!(
2068 col_count, 1,
2069 "columns_json column must exist after migration 22"
2070 );
2071 }
2072
2073 #[test]
2075 fn migration_23_drops_global_fts_node_properties_table() {
2076 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2077 let manager = SchemaManager::new();
2078 manager.bootstrap(&conn).expect("bootstrap");
2079
2080 let count: i64 = conn
2082 .query_row(
2083 "SELECT count(*) FROM sqlite_master \
2084 WHERE type='table' AND name='fts_node_properties'",
2085 [],
2086 |r| r.get(0),
2087 )
2088 .expect("check sqlite_master");
2089 assert_eq!(
2090 count, 0,
2091 "fts_node_properties must be dropped by migration 23"
2092 );
2093 }
2094}