1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6 Migration::new(
7 SchemaVersion(1),
8 "initial canonical schema and runtime tables",
9 r"
10 CREATE TABLE IF NOT EXISTS nodes (
11 row_id TEXT PRIMARY KEY,
12 logical_id TEXT NOT NULL,
13 kind TEXT NOT NULL,
14 properties BLOB NOT NULL,
15 created_at INTEGER NOT NULL,
16 superseded_at INTEGER,
17 source_ref TEXT,
18 confidence REAL
19 );
20
21 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22 ON nodes(logical_id)
23 WHERE superseded_at IS NULL;
24 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25 ON nodes(kind, superseded_at);
26 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27 ON nodes(source_ref);
28
29 CREATE TABLE IF NOT EXISTS edges (
30 row_id TEXT PRIMARY KEY,
31 logical_id TEXT NOT NULL,
32 source_logical_id TEXT NOT NULL,
33 target_logical_id TEXT NOT NULL,
34 kind TEXT NOT NULL,
35 properties BLOB NOT NULL,
36 created_at INTEGER NOT NULL,
37 superseded_at INTEGER,
38 source_ref TEXT,
39 confidence REAL
40 );
41
42 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43 ON edges(logical_id)
44 WHERE superseded_at IS NULL;
45 CREATE INDEX IF NOT EXISTS idx_edges_source_active
46 ON edges(source_logical_id, kind, superseded_at);
47 CREATE INDEX IF NOT EXISTS idx_edges_target_active
48 ON edges(target_logical_id, kind, superseded_at);
49 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50 ON edges(source_ref);
51
52 CREATE TABLE IF NOT EXISTS chunks (
53 id TEXT PRIMARY KEY,
54 node_logical_id TEXT NOT NULL,
55 text_content TEXT NOT NULL,
56 byte_start INTEGER,
57 byte_end INTEGER,
58 created_at INTEGER NOT NULL
59 );
60
61 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62 ON chunks(node_logical_id);
63
64 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65 chunk_id UNINDEXED,
66 node_logical_id UNINDEXED,
67 kind UNINDEXED,
68 text_content
69 );
70
71 CREATE TABLE IF NOT EXISTS vector_profiles (
72 profile TEXT PRIMARY KEY,
73 table_name TEXT NOT NULL,
74 dimension INTEGER NOT NULL,
75 enabled INTEGER NOT NULL DEFAULT 0
76 );
77
78 CREATE TABLE IF NOT EXISTS runs (
79 id TEXT PRIMARY KEY,
80 kind TEXT NOT NULL,
81 status TEXT NOT NULL,
82 properties BLOB NOT NULL,
83 created_at INTEGER NOT NULL,
84 completed_at INTEGER,
85 superseded_at INTEGER,
86 source_ref TEXT
87 );
88
89 CREATE TABLE IF NOT EXISTS steps (
90 id TEXT PRIMARY KEY,
91 run_id TEXT NOT NULL,
92 kind TEXT NOT NULL,
93 status TEXT NOT NULL,
94 properties BLOB NOT NULL,
95 created_at INTEGER NOT NULL,
96 completed_at INTEGER,
97 superseded_at INTEGER,
98 source_ref TEXT,
99 FOREIGN KEY(run_id) REFERENCES runs(id)
100 );
101
102 CREATE TABLE IF NOT EXISTS actions (
103 id TEXT PRIMARY KEY,
104 step_id TEXT NOT NULL,
105 kind TEXT NOT NULL,
106 status TEXT NOT NULL,
107 properties BLOB NOT NULL,
108 created_at INTEGER NOT NULL,
109 completed_at INTEGER,
110 superseded_at INTEGER,
111 source_ref TEXT,
112 FOREIGN KEY(step_id) REFERENCES steps(id)
113 );
114
115 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116 ON runs(source_ref);
117 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118 ON steps(source_ref);
119 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120 ON actions(source_ref);
121 ",
122 ),
123 Migration::new(
124 SchemaVersion(2),
125 "durable audit trail: provenance_events table",
126 r"
127 CREATE TABLE IF NOT EXISTS provenance_events (
128 id TEXT PRIMARY KEY,
129 event_type TEXT NOT NULL,
130 subject TEXT NOT NULL,
131 source_ref TEXT,
132 created_at INTEGER NOT NULL DEFAULT (unixepoch())
133 );
134 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135 ON provenance_events (subject, event_type);
136 ",
137 ),
138 Migration::new(
139 SchemaVersion(3),
140 "vector regeneration contracts",
141 r"
142 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143 profile TEXT PRIMARY KEY,
144 table_name TEXT NOT NULL,
145 model_identity TEXT NOT NULL,
146 model_version TEXT NOT NULL,
147 dimension INTEGER NOT NULL,
148 normalization_policy TEXT NOT NULL,
149 chunking_policy TEXT NOT NULL,
150 preprocessing_policy TEXT NOT NULL,
151 generator_command_json TEXT NOT NULL,
152 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153 );
154 ",
155 ),
156 Migration::new(
157 SchemaVersion(4),
158 "vector regeneration apply metadata",
159 r"
160 ALTER TABLE vector_embedding_contracts
161 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162 ALTER TABLE vector_embedding_contracts
163 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164 UPDATE vector_embedding_contracts
165 SET
166 applied_at = CASE
167 WHEN applied_at = 0 THEN updated_at
168 ELSE applied_at
169 END,
170 snapshot_hash = CASE
171 WHEN snapshot_hash = '' THEN 'legacy'
172 ELSE snapshot_hash
173 END;
174 ",
175 ),
176 Migration::new(
177 SchemaVersion(5),
178 "vector regeneration contract format version",
179 r"
180 ALTER TABLE vector_embedding_contracts
181 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182 UPDATE vector_embedding_contracts
183 SET contract_format_version = 1
184 WHERE contract_format_version = 0;
185 ",
186 ),
187 Migration::new(
188 SchemaVersion(6),
189 "provenance metadata payloads",
190 r"
191 ALTER TABLE provenance_events
192 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193 ",
194 ),
195 Migration::new(
196 SchemaVersion(7),
197 "operational store canonical and derived tables",
198 r"
199 CREATE TABLE IF NOT EXISTS operational_collections (
200 name TEXT PRIMARY KEY,
201 kind TEXT NOT NULL,
202 schema_json TEXT NOT NULL,
203 retention_json TEXT NOT NULL,
204 format_version INTEGER NOT NULL DEFAULT 1,
205 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206 disabled_at INTEGER
207 );
208
209 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210 ON operational_collections(kind, disabled_at);
211
212 CREATE TABLE IF NOT EXISTS operational_mutations (
213 id TEXT PRIMARY KEY,
214 collection_name TEXT NOT NULL,
215 record_key TEXT NOT NULL,
216 op_kind TEXT NOT NULL,
217 payload_json TEXT NOT NULL,
218 source_ref TEXT,
219 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221 );
222
223 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226 ON operational_mutations(source_ref);
227
228 CREATE TABLE IF NOT EXISTS operational_current (
229 collection_name TEXT NOT NULL,
230 record_key TEXT NOT NULL,
231 payload_json TEXT NOT NULL,
232 updated_at INTEGER NOT NULL,
233 last_mutation_id TEXT NOT NULL,
234 PRIMARY KEY(collection_name, record_key),
235 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237 );
238
239 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240 ON operational_current(collection_name, updated_at DESC);
241 ",
242 ),
243 Migration::new(
244 SchemaVersion(8),
245 "operational mutation ordering hardening",
246 r"
247 ALTER TABLE operational_mutations
248 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249 UPDATE operational_mutations
250 SET mutation_order = rowid
251 WHERE mutation_order = 0;
252 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253 ON operational_mutations(collection_name, record_key, mutation_order DESC);
254 ",
255 ),
256 Migration::new(
257 SchemaVersion(9),
258 "node last_accessed metadata",
259 r"
260 CREATE TABLE IF NOT EXISTS node_access_metadata (
261 logical_id TEXT PRIMARY KEY,
262 last_accessed_at INTEGER NOT NULL,
263 updated_at INTEGER NOT NULL
264 );
265
266 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267 ON node_access_metadata(last_accessed_at DESC);
268 ",
269 ),
270 Migration::new(
271 SchemaVersion(10),
272 "operational filtered read contracts and extracted values",
273 r"
274 ALTER TABLE operational_collections
275 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277 CREATE TABLE IF NOT EXISTS operational_filter_values (
278 mutation_id TEXT NOT NULL,
279 collection_name TEXT NOT NULL,
280 field_name TEXT NOT NULL,
281 string_value TEXT,
282 integer_value INTEGER,
283 PRIMARY KEY(mutation_id, field_name),
284 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286 );
287
288 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292 ",
293 ),
294 Migration::new(
295 SchemaVersion(11),
296 "operational payload validation contracts",
297 r"
298 ALTER TABLE operational_collections
299 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300 ",
301 ),
302 Migration::new(
303 SchemaVersion(12),
304 "operational secondary indexes",
305 r"
306 ALTER TABLE operational_collections
307 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310 collection_name TEXT NOT NULL,
311 index_name TEXT NOT NULL,
312 subject_kind TEXT NOT NULL,
313 mutation_id TEXT NOT NULL DEFAULT '',
314 record_key TEXT NOT NULL DEFAULT '',
315 sort_timestamp INTEGER,
316 slot1_text TEXT,
317 slot1_integer INTEGER,
318 slot2_text TEXT,
319 slot2_integer INTEGER,
320 slot3_text TEXT,
321 slot3_integer INTEGER,
322 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325 );
326
327 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328 ON operational_secondary_index_entries(
329 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330 );
331 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332 ON operational_secondary_index_entries(
333 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334 );
335 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336 ON operational_secondary_index_entries(
337 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338 );
339 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340 ON operational_secondary_index_entries(
341 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342 );
343 ",
344 ),
345 Migration::new(
346 SchemaVersion(13),
347 "operational retention run metadata",
348 r"
349 CREATE TABLE IF NOT EXISTS operational_retention_runs (
350 id TEXT PRIMARY KEY,
351 collection_name TEXT NOT NULL,
352 executed_at INTEGER NOT NULL,
353 action_kind TEXT NOT NULL,
354 dry_run INTEGER NOT NULL DEFAULT 0,
355 deleted_mutations INTEGER NOT NULL,
356 rows_remaining INTEGER NOT NULL,
357 metadata_json TEXT NOT NULL DEFAULT '',
358 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359 );
360
361 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362 ON operational_retention_runs(collection_name, executed_at DESC);
363 ",
364 ),
365 Migration::new(
366 SchemaVersion(14),
367 "external content object columns",
368 r"
369 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372 ON nodes(content_ref)
373 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376 ",
377 ),
378 Migration::new(
379 SchemaVersion(15),
380 "FTS property projection schemas",
381 r"
382 CREATE TABLE IF NOT EXISTS fts_property_schemas (
383 kind TEXT PRIMARY KEY,
384 property_paths_json TEXT NOT NULL,
385 separator TEXT NOT NULL DEFAULT ' ',
386 format_version INTEGER NOT NULL DEFAULT 1,
387 created_at INTEGER NOT NULL DEFAULT (unixepoch())
388 );
389
390 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391 node_logical_id UNINDEXED,
392 kind UNINDEXED,
393 text_content
394 );
395 ",
396 ),
397];
398
399#[derive(Clone, Debug, PartialEq, Eq)]
400pub struct BootstrapReport {
401 pub sqlite_version: String,
402 pub applied_versions: Vec<SchemaVersion>,
403 pub vector_profile_enabled: bool,
404}
405
406#[derive(Clone, Debug, Default)]
407pub struct SchemaManager;
408
409impl SchemaManager {
410 #[must_use]
411 pub fn new() -> Self {
412 Self
413 }
414
415 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
423 self.initialize_connection(conn)?;
424 Self::ensure_metadata_tables(conn)?;
425
426 let max_applied: u32 = conn.query_row(
428 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
429 [],
430 |row| row.get(0),
431 )?;
432 let engine_version = self.current_version().0;
433 trace_info!(
434 current_version = max_applied,
435 engine_version,
436 "schema bootstrap: version check"
437 );
438 if max_applied > engine_version {
439 trace_error!(
440 database_version = max_applied,
441 engine_version,
442 "schema version mismatch: database is newer than engine"
443 );
444 return Err(SchemaError::VersionMismatch {
445 database_version: max_applied,
446 engine_version,
447 });
448 }
449
450 let mut applied_versions = Vec::new();
451 for migration in self.migrations() {
452 let already_applied = conn
453 .query_row(
454 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
455 [i64::from(migration.version.0)],
456 |row| row.get::<_, i64>(0),
457 )
458 .optional()?
459 .is_some();
460
461 if already_applied {
462 continue;
463 }
464
465 let tx = conn.unchecked_transaction()?;
466 match migration.version {
467 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
468 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
469 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
470 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
471 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
472 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
473 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
474 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
475 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
476 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
477 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
478 _ => tx.execute_batch(migration.sql)?,
479 }
480 tx.execute(
481 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
482 (i64::from(migration.version.0), migration.description),
483 )?;
484 tx.commit()?;
485 trace_info!(
486 version = migration.version.0,
487 description = migration.description,
488 "schema migration applied"
489 );
490 applied_versions.push(migration.version);
491 }
492
493 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
494 let vector_profile_count: i64 = conn.query_row(
495 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
496 [],
497 |row| row.get(0),
498 )?;
499 Ok(BootstrapReport {
500 sqlite_version,
501 applied_versions,
502 vector_profile_enabled: vector_profile_count > 0,
503 })
504 }
505
506 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
507 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
508 let columns = stmt
509 .query_map([], |row| row.get::<_, String>(1))?
510 .collect::<Result<Vec<_>, _>>()?;
511 let has_applied_at = columns.iter().any(|column| column == "applied_at");
512 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
513
514 if !has_applied_at {
515 conn.execute(
516 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
517 [],
518 )?;
519 }
520 if !has_snapshot_hash {
521 conn.execute(
522 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
523 [],
524 )?;
525 }
526 conn.execute(
527 r"
528 UPDATE vector_embedding_contracts
529 SET
530 applied_at = CASE
531 WHEN applied_at = 0 THEN updated_at
532 ELSE applied_at
533 END,
534 snapshot_hash = CASE
535 WHEN snapshot_hash = '' THEN 'legacy'
536 ELSE snapshot_hash
537 END
538 ",
539 [],
540 )?;
541 Ok(())
542 }
543
544 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
545 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
546 let columns = stmt
547 .query_map([], |row| row.get::<_, String>(1))?
548 .collect::<Result<Vec<_>, _>>()?;
549 let has_contract_format_version = columns
550 .iter()
551 .any(|column| column == "contract_format_version");
552
553 if !has_contract_format_version {
554 conn.execute(
555 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
556 [],
557 )?;
558 }
559 conn.execute(
560 r"
561 UPDATE vector_embedding_contracts
562 SET contract_format_version = 1
563 WHERE contract_format_version = 0
564 ",
565 [],
566 )?;
567 Ok(())
568 }
569
570 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
571 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
572 let columns = stmt
573 .query_map([], |row| row.get::<_, String>(1))?
574 .collect::<Result<Vec<_>, _>>()?;
575 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
576
577 if !has_metadata_json {
578 conn.execute(
579 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
580 [],
581 )?;
582 }
583 Ok(())
584 }
585
586 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
587 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
588 let columns = stmt
589 .query_map([], |row| row.get::<_, String>(1))?
590 .collect::<Result<Vec<_>, _>>()?;
591 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
592
593 if !has_mutation_order {
594 conn.execute(
595 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
596 [],
597 )?;
598 }
599 conn.execute(
600 r"
601 UPDATE operational_mutations
602 SET mutation_order = rowid
603 WHERE mutation_order = 0
604 ",
605 [],
606 )?;
607 conn.execute(
608 r"
609 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
610 ON operational_mutations(collection_name, record_key, mutation_order DESC)
611 ",
612 [],
613 )?;
614 Ok(())
615 }
616
617 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
618 conn.execute_batch(
619 r"
620 CREATE TABLE IF NOT EXISTS node_access_metadata (
621 logical_id TEXT PRIMARY KEY,
622 last_accessed_at INTEGER NOT NULL,
623 updated_at INTEGER NOT NULL
624 );
625
626 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
627 ON node_access_metadata(last_accessed_at DESC);
628 ",
629 )?;
630 Ok(())
631 }
632
633 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
634 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
635 let columns = stmt
636 .query_map([], |row| row.get::<_, String>(1))?
637 .collect::<Result<Vec<_>, _>>()?;
638 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
639
640 if !has_filter_fields_json {
641 conn.execute(
642 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
643 [],
644 )?;
645 }
646
647 conn.execute_batch(
648 r"
649 CREATE TABLE IF NOT EXISTS operational_filter_values (
650 mutation_id TEXT NOT NULL,
651 collection_name TEXT NOT NULL,
652 field_name TEXT NOT NULL,
653 string_value TEXT,
654 integer_value INTEGER,
655 PRIMARY KEY(mutation_id, field_name),
656 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
657 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
658 );
659
660 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
661 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
662 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
663 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
664 ",
665 )?;
666 Ok(())
667 }
668
669 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
670 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
671 let columns = stmt
672 .query_map([], |row| row.get::<_, String>(1))?
673 .collect::<Result<Vec<_>, _>>()?;
674 let has_validation_json = columns.iter().any(|column| column == "validation_json");
675
676 if !has_validation_json {
677 conn.execute(
678 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
679 [],
680 )?;
681 }
682
683 Ok(())
684 }
685
686 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
687 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
688 let columns = stmt
689 .query_map([], |row| row.get::<_, String>(1))?
690 .collect::<Result<Vec<_>, _>>()?;
691 let has_secondary_indexes_json = columns
692 .iter()
693 .any(|column| column == "secondary_indexes_json");
694
695 if !has_secondary_indexes_json {
696 conn.execute(
697 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
698 [],
699 )?;
700 }
701
702 conn.execute_batch(
703 r"
704 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
705 collection_name TEXT NOT NULL,
706 index_name TEXT NOT NULL,
707 subject_kind TEXT NOT NULL,
708 mutation_id TEXT NOT NULL DEFAULT '',
709 record_key TEXT NOT NULL DEFAULT '',
710 sort_timestamp INTEGER,
711 slot1_text TEXT,
712 slot1_integer INTEGER,
713 slot2_text TEXT,
714 slot2_integer INTEGER,
715 slot3_text TEXT,
716 slot3_integer INTEGER,
717 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
718 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
719 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
720 );
721
722 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
723 ON operational_secondary_index_entries(
724 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
725 );
726 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
727 ON operational_secondary_index_entries(
728 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
729 );
730 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
731 ON operational_secondary_index_entries(
732 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
733 );
734 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
735 ON operational_secondary_index_entries(
736 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
737 );
738 ",
739 )?;
740
741 Ok(())
742 }
743
744 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
745 conn.execute_batch(
746 r"
747 CREATE TABLE IF NOT EXISTS operational_retention_runs (
748 id TEXT PRIMARY KEY,
749 collection_name TEXT NOT NULL,
750 executed_at INTEGER NOT NULL,
751 action_kind TEXT NOT NULL,
752 dry_run INTEGER NOT NULL DEFAULT 0,
753 deleted_mutations INTEGER NOT NULL,
754 rows_remaining INTEGER NOT NULL,
755 metadata_json TEXT NOT NULL DEFAULT '',
756 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
757 );
758
759 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
760 ON operational_retention_runs(collection_name, executed_at DESC);
761 ",
762 )?;
763 Ok(())
764 }
765
766 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
767 let node_columns = Self::column_names(conn, "nodes")?;
768 if !node_columns.iter().any(|c| c == "content_ref") {
769 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
770 }
771 conn.execute_batch(
772 r"
773 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
774 ON nodes(content_ref)
775 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
776 ",
777 )?;
778
779 let chunk_columns = Self::column_names(conn, "chunks")?;
780 if !chunk_columns.iter().any(|c| c == "content_hash") {
781 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
782 }
783 Ok(())
784 }
785
786 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
787 conn.execute_batch(
788 r"
789 CREATE TABLE IF NOT EXISTS fts_property_schemas (
790 kind TEXT PRIMARY KEY,
791 property_paths_json TEXT NOT NULL,
792 separator TEXT NOT NULL DEFAULT ' ',
793 format_version INTEGER NOT NULL DEFAULT 1,
794 created_at INTEGER NOT NULL DEFAULT (unixepoch())
795 );
796
797 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
798 node_logical_id UNINDEXED,
799 kind UNINDEXED,
800 text_content
801 );
802 ",
803 )?;
804 Ok(())
805 }
806
807 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
808 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
809 let names = stmt
810 .query_map([], |row| row.get::<_, String>(1))?
811 .collect::<Result<Vec<_>, _>>()?;
812 Ok(names)
813 }
814
815 #[must_use]
816 pub fn current_version(&self) -> SchemaVersion {
817 self.migrations()
818 .last()
819 .map_or(SchemaVersion(0), |migration| migration.version)
820 }
821
822 #[must_use]
823 pub fn migrations(&self) -> &'static [Migration] {
824 MIGRATIONS
825 }
826
827 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
833 conn.execute_batch(
834 r"
835 PRAGMA foreign_keys = ON;
836 PRAGMA journal_mode = WAL;
837 PRAGMA synchronous = NORMAL;
838 PRAGMA busy_timeout = 5000;
839 PRAGMA temp_store = MEMORY;
840 PRAGMA mmap_size = 3000000000;
841 PRAGMA journal_size_limit = 536870912;
842 ",
843 )?;
844 Ok(())
845 }
846
847 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
858 conn.execute_batch(
859 r"
860 PRAGMA foreign_keys = ON;
861 PRAGMA busy_timeout = 5000;
862 PRAGMA temp_store = MEMORY;
863 PRAGMA mmap_size = 3000000000;
864 ",
865 )?;
866 Ok(())
867 }
868
869 #[cfg(feature = "sqlite-vec")]
881 pub fn ensure_vector_profile(
882 &self,
883 conn: &Connection,
884 profile: &str,
885 table_name: &str,
886 dimension: usize,
887 ) -> Result<(), SchemaError> {
888 conn.execute_batch(&format!(
889 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
890 chunk_id TEXT PRIMARY KEY,\
891 embedding float[{dimension}]\
892 )"
893 ))?;
894 conn.execute(
895 "INSERT OR REPLACE INTO vector_profiles \
896 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
897 rusqlite::params![profile, table_name, dimension as i64],
898 )?;
899 Ok(())
900 }
901
902 #[cfg(not(feature = "sqlite-vec"))]
907 pub fn ensure_vector_profile(
908 &self,
909 _conn: &Connection,
910 _profile: &str,
911 _table_name: &str,
912 _dimension: usize,
913 ) -> Result<(), SchemaError> {
914 Err(SchemaError::MissingCapability("sqlite-vec"))
915 }
916
917 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
923 conn.execute_batch(
924 r"
925 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
926 version INTEGER PRIMARY KEY,
927 description TEXT NOT NULL,
928 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
929 );
930 ",
931 )?;
932 Ok(())
933 }
934}
935
936#[cfg(test)]
937#[allow(clippy::expect_used)]
938mod tests {
939 use rusqlite::Connection;
940
941 use super::SchemaManager;
942
943 #[test]
944 fn bootstrap_applies_initial_schema() {
945 let conn = Connection::open_in_memory().expect("in-memory sqlite");
946 let manager = SchemaManager::new();
947
948 let report = manager.bootstrap(&conn).expect("bootstrap report");
949
950 assert_eq!(
951 report.applied_versions.len(),
952 manager.current_version().0 as usize
953 );
954 assert!(report.sqlite_version.starts_with('3'));
955 let table_count: i64 = conn
956 .query_row(
957 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
958 [],
959 |row| row.get(0),
960 )
961 .expect("nodes table exists");
962 assert_eq!(table_count, 1);
963 }
964
965 #[test]
968 fn vector_profile_not_enabled_without_feature() {
969 let conn = Connection::open_in_memory().expect("in-memory sqlite");
970 let manager = SchemaManager::new();
971 let report = manager.bootstrap(&conn).expect("bootstrap");
972 assert!(
973 !report.vector_profile_enabled,
974 "vector_profile_enabled must be false on a fresh bootstrap"
975 );
976 }
977
978 #[test]
979 fn vector_profile_skipped_when_dimension_absent() {
980 let conn = Connection::open_in_memory().expect("in-memory sqlite");
982 let manager = SchemaManager::new();
983 manager.bootstrap(&conn).expect("bootstrap");
984
985 let count: i64 = conn
986 .query_row(
987 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
988 [],
989 |row| row.get(0),
990 )
991 .expect("count");
992 assert_eq!(
993 count, 0,
994 "no enabled profile without calling ensure_vector_profile"
995 );
996 }
997
998 #[test]
999 fn bootstrap_report_reflects_actual_vector_state() {
1000 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1002 let manager = SchemaManager::new();
1003 let report = manager.bootstrap(&conn).expect("bootstrap");
1004
1005 let db_count: i64 = conn
1006 .query_row(
1007 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1008 [],
1009 |row| row.get(0),
1010 )
1011 .expect("count");
1012 assert_eq!(
1013 report.vector_profile_enabled,
1014 db_count > 0,
1015 "BootstrapReport.vector_profile_enabled must match actual DB state"
1016 );
1017 }
1018
1019 #[test]
1020 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1021 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1022 conn.execute_batch(
1023 r#"
1024 CREATE TABLE provenance_events (
1025 id TEXT PRIMARY KEY,
1026 event_type TEXT NOT NULL,
1027 subject TEXT NOT NULL,
1028 source_ref TEXT,
1029 created_at INTEGER NOT NULL DEFAULT (unixepoch())
1030 );
1031 CREATE TABLE vector_embedding_contracts (
1032 profile TEXT PRIMARY KEY,
1033 table_name TEXT NOT NULL,
1034 model_identity TEXT NOT NULL,
1035 model_version TEXT NOT NULL,
1036 dimension INTEGER NOT NULL,
1037 normalization_policy TEXT NOT NULL,
1038 chunking_policy TEXT NOT NULL,
1039 preprocessing_policy TEXT NOT NULL,
1040 generator_command_json TEXT NOT NULL,
1041 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1042 applied_at INTEGER NOT NULL DEFAULT 0,
1043 snapshot_hash TEXT NOT NULL DEFAULT ''
1044 );
1045 INSERT INTO vector_embedding_contracts (
1046 profile,
1047 table_name,
1048 model_identity,
1049 model_version,
1050 dimension,
1051 normalization_policy,
1052 chunking_policy,
1053 preprocessing_policy,
1054 generator_command_json,
1055 updated_at,
1056 applied_at,
1057 snapshot_hash
1058 ) VALUES (
1059 'default',
1060 'vec_nodes_active',
1061 'legacy-model',
1062 '0.9.0',
1063 4,
1064 'l2',
1065 'per_chunk',
1066 'trim',
1067 '["/bin/echo"]',
1068 100,
1069 100,
1070 'legacy'
1071 );
1072 "#,
1073 )
1074 .expect("seed legacy schema");
1075 let manager = SchemaManager::new();
1076
1077 let report = manager.bootstrap(&conn).expect("bootstrap");
1078
1079 assert!(
1080 report.applied_versions.iter().any(|version| version.0 >= 5),
1081 "bootstrap should apply hardening migrations"
1082 );
1083 let format_version: i64 = conn
1084 .query_row(
1085 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1086 [],
1087 |row| row.get(0),
1088 )
1089 .expect("contract_format_version");
1090 assert_eq!(format_version, 1);
1091 let metadata_column_count: i64 = conn
1092 .query_row(
1093 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1094 [],
1095 |row| row.get(0),
1096 )
1097 .expect("metadata_json column count");
1098 assert_eq!(metadata_column_count, 1);
1099 }
1100
1101 #[test]
1102 fn bootstrap_creates_operational_store_tables() {
1103 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1104 let manager = SchemaManager::new();
1105
1106 manager.bootstrap(&conn).expect("bootstrap");
1107
1108 for table in [
1109 "operational_collections",
1110 "operational_mutations",
1111 "operational_current",
1112 ] {
1113 let count: i64 = conn
1114 .query_row(
1115 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1116 [table],
1117 |row| row.get(0),
1118 )
1119 .expect("table existence");
1120 assert_eq!(count, 1, "{table} should exist after bootstrap");
1121 }
1122 let mutation_order_columns: i64 = conn
1123 .query_row(
1124 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1125 [],
1126 |row| row.get(0),
1127 )
1128 .expect("mutation_order column exists");
1129 assert_eq!(mutation_order_columns, 1);
1130 }
1131
1132 #[test]
1133 fn bootstrap_is_idempotent_with_operational_store_tables() {
1134 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1135 let manager = SchemaManager::new();
1136
1137 manager.bootstrap(&conn).expect("first bootstrap");
1138 let report = manager.bootstrap(&conn).expect("second bootstrap");
1139
1140 assert!(
1141 report.applied_versions.is_empty(),
1142 "second bootstrap should apply no new migrations"
1143 );
1144 let count: i64 = conn
1145 .query_row(
1146 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1147 [],
1148 |row| row.get(0),
1149 )
1150 .expect("operational_collections table exists");
1151 assert_eq!(count, 1);
1152 }
1153
1154 #[test]
1155 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1156 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1157 conn.execute_batch(
1158 r#"
1159 CREATE TABLE operational_collections (
1160 name TEXT PRIMARY KEY,
1161 kind TEXT NOT NULL,
1162 schema_json TEXT NOT NULL,
1163 retention_json TEXT NOT NULL,
1164 format_version INTEGER NOT NULL DEFAULT 1,
1165 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1166 disabled_at INTEGER
1167 );
1168
1169 CREATE TABLE operational_mutations (
1170 id TEXT PRIMARY KEY,
1171 collection_name TEXT NOT NULL,
1172 record_key TEXT NOT NULL,
1173 op_kind TEXT NOT NULL,
1174 payload_json TEXT NOT NULL,
1175 source_ref TEXT,
1176 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1177 mutation_order INTEGER NOT NULL DEFAULT 0,
1178 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1179 );
1180
1181 CREATE TABLE operational_current (
1182 collection_name TEXT NOT NULL,
1183 record_key TEXT NOT NULL,
1184 payload_json TEXT NOT NULL,
1185 updated_at INTEGER NOT NULL,
1186 last_mutation_id TEXT NOT NULL,
1187 PRIMARY KEY(collection_name, record_key),
1188 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1189 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1190 );
1191
1192 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1193 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1194 INSERT INTO operational_mutations (
1195 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1196 ) VALUES (
1197 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1198 );
1199 "#,
1200 )
1201 .expect("seed recovered operational tables");
1202
1203 let manager = SchemaManager::new();
1204 let report = manager
1205 .bootstrap(&conn)
1206 .expect("bootstrap recovered schema");
1207
1208 assert!(
1209 report.applied_versions.iter().any(|version| version.0 == 8),
1210 "bootstrap should record operational mutation ordering hardening"
1211 );
1212 let mutation_order: i64 = conn
1213 .query_row(
1214 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1215 [],
1216 |row| row.get(0),
1217 )
1218 .expect("mutation_order");
1219 assert_ne!(
1220 mutation_order, 0,
1221 "bootstrap should backfill recovered operational rows"
1222 );
1223 let count: i64 = conn
1224 .query_row(
1225 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1226 [],
1227 |row| row.get(0),
1228 )
1229 .expect("ordering index exists");
1230 assert_eq!(count, 1);
1231 }
1232
1233 #[test]
1234 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1235 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1236 conn.execute_batch(
1237 r#"
1238 CREATE TABLE operational_collections (
1239 name TEXT PRIMARY KEY,
1240 kind TEXT NOT NULL,
1241 schema_json TEXT NOT NULL,
1242 retention_json TEXT NOT NULL,
1243 format_version INTEGER NOT NULL DEFAULT 1,
1244 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1245 disabled_at INTEGER
1246 );
1247
1248 CREATE TABLE operational_mutations (
1249 id TEXT PRIMARY KEY,
1250 collection_name TEXT NOT NULL,
1251 record_key TEXT NOT NULL,
1252 op_kind TEXT NOT NULL,
1253 payload_json TEXT NOT NULL,
1254 source_ref TEXT,
1255 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1256 mutation_order INTEGER NOT NULL DEFAULT 1,
1257 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1258 );
1259
1260 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1261 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1262 "#,
1263 )
1264 .expect("seed recovered operational schema");
1265
1266 let manager = SchemaManager::new();
1267 let report = manager
1268 .bootstrap(&conn)
1269 .expect("bootstrap recovered schema");
1270
1271 assert!(
1272 report
1273 .applied_versions
1274 .iter()
1275 .any(|version| version.0 == 10),
1276 "bootstrap should record operational filtered read migration"
1277 );
1278 assert!(
1279 report
1280 .applied_versions
1281 .iter()
1282 .any(|version| version.0 == 11),
1283 "bootstrap should record operational validation migration"
1284 );
1285 let filter_fields_json: String = conn
1286 .query_row(
1287 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1288 [],
1289 |row| row.get(0),
1290 )
1291 .expect("filter_fields_json added");
1292 assert_eq!(filter_fields_json, "[]");
1293 let validation_json: String = conn
1294 .query_row(
1295 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1296 [],
1297 |row| row.get(0),
1298 )
1299 .expect("validation_json added");
1300 assert_eq!(validation_json, "");
1301 let table_count: i64 = conn
1302 .query_row(
1303 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1304 [],
1305 |row| row.get(0),
1306 )
1307 .expect("filter table exists");
1308 assert_eq!(table_count, 1);
1309 }
1310
1311 #[test]
1312 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1313 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1314 let manager = SchemaManager::new();
1315 manager.bootstrap(&conn).expect("initial bootstrap");
1316
1317 conn.execute("DROP TABLE fathom_schema_migrations", [])
1318 .expect("drop migration history");
1319 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1320
1321 let report = manager
1322 .bootstrap(&conn)
1323 .expect("rebootstrap existing schema");
1324
1325 assert!(
1326 report
1327 .applied_versions
1328 .iter()
1329 .any(|version| version.0 == 10),
1330 "rebootstrap should re-record migration 10"
1331 );
1332 assert!(
1333 report
1334 .applied_versions
1335 .iter()
1336 .any(|version| version.0 == 11),
1337 "rebootstrap should re-record migration 11"
1338 );
1339 let filter_fields_json: String = conn
1340 .query_row(
1341 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1342 [],
1343 |row| row.get(0),
1344 )
1345 .unwrap_or_else(|_| "[]".to_string());
1346 assert_eq!(filter_fields_json, "[]");
1347 let validation_json: String = conn
1348 .query_row(
1349 "SELECT validation_json FROM operational_collections LIMIT 1",
1350 [],
1351 |row| row.get(0),
1352 )
1353 .unwrap_or_default();
1354 assert_eq!(validation_json, "");
1355 }
1356
1357 #[test]
1358 fn downgrade_detected_returns_version_mismatch() {
1359 use crate::SchemaError;
1360
1361 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1362 let manager = SchemaManager::new();
1363 manager.bootstrap(&conn).expect("initial bootstrap");
1364
1365 conn.execute(
1366 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1367 (999_i64, "future migration"),
1368 )
1369 .expect("insert future version");
1370
1371 let err = manager
1372 .bootstrap(&conn)
1373 .expect_err("should fail on downgrade");
1374 assert!(
1375 matches!(
1376 err,
1377 SchemaError::VersionMismatch {
1378 database_version: 999,
1379 ..
1380 }
1381 ),
1382 "expected VersionMismatch with database_version 999, got: {err}"
1383 );
1384 }
1385
1386 #[test]
1387 fn journal_size_limit_is_set() {
1388 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1389 let manager = SchemaManager::new();
1390 manager
1391 .initialize_connection(&conn)
1392 .expect("initialize_connection");
1393
1394 let limit: i64 = conn
1395 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1396 .expect("journal_size_limit pragma");
1397 assert_eq!(limit, 536_870_912);
1398 }
1399
1400 #[cfg(feature = "sqlite-vec")]
1401 #[test]
1402 fn vector_profile_created_when_feature_enabled() {
1403 unsafe {
1406 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1407 sqlite_vec::sqlite3_vec_init as *const (),
1408 )));
1409 }
1410 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1411 let manager = SchemaManager::new();
1412 manager.bootstrap(&conn).expect("bootstrap");
1413
1414 manager
1415 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1416 .expect("ensure_vector_profile");
1417
1418 let count: i64 = conn
1419 .query_row(
1420 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1421 [],
1422 |row| row.get(0),
1423 )
1424 .expect("count");
1425 assert_eq!(
1426 count, 1,
1427 "vector profile must be enabled after ensure_vector_profile"
1428 );
1429
1430 let _: i64 = conn
1432 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1433 row.get(0)
1434 })
1435 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1436 }
1437}