1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7 Migration::new(
8 SchemaVersion(1),
9 "initial canonical schema and runtime tables",
10 r"
11 CREATE TABLE IF NOT EXISTS nodes (
12 row_id TEXT PRIMARY KEY,
13 logical_id TEXT NOT NULL,
14 kind TEXT NOT NULL,
15 properties BLOB NOT NULL,
16 created_at INTEGER NOT NULL,
17 superseded_at INTEGER,
18 source_ref TEXT,
19 confidence REAL
20 );
21
22 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23 ON nodes(logical_id)
24 WHERE superseded_at IS NULL;
25 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26 ON nodes(kind, superseded_at);
27 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28 ON nodes(source_ref);
29
30 CREATE TABLE IF NOT EXISTS edges (
31 row_id TEXT PRIMARY KEY,
32 logical_id TEXT NOT NULL,
33 source_logical_id TEXT NOT NULL,
34 target_logical_id TEXT NOT NULL,
35 kind TEXT NOT NULL,
36 properties BLOB NOT NULL,
37 created_at INTEGER NOT NULL,
38 superseded_at INTEGER,
39 source_ref TEXT,
40 confidence REAL
41 );
42
43 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44 ON edges(logical_id)
45 WHERE superseded_at IS NULL;
46 CREATE INDEX IF NOT EXISTS idx_edges_source_active
47 ON edges(source_logical_id, kind, superseded_at);
48 CREATE INDEX IF NOT EXISTS idx_edges_target_active
49 ON edges(target_logical_id, kind, superseded_at);
50 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51 ON edges(source_ref);
52
53 CREATE TABLE IF NOT EXISTS chunks (
54 id TEXT PRIMARY KEY,
55 node_logical_id TEXT NOT NULL,
56 text_content TEXT NOT NULL,
57 byte_start INTEGER,
58 byte_end INTEGER,
59 created_at INTEGER NOT NULL
60 );
61
62 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63 ON chunks(node_logical_id);
64
65 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66 chunk_id UNINDEXED,
67 node_logical_id UNINDEXED,
68 kind UNINDEXED,
69 text_content
70 );
71
72 CREATE TABLE IF NOT EXISTS vector_profiles (
73 profile TEXT PRIMARY KEY,
74 table_name TEXT NOT NULL,
75 dimension INTEGER NOT NULL,
76 enabled INTEGER NOT NULL DEFAULT 0
77 );
78
79 CREATE TABLE IF NOT EXISTS runs (
80 id TEXT PRIMARY KEY,
81 kind TEXT NOT NULL,
82 status TEXT NOT NULL,
83 properties BLOB NOT NULL,
84 created_at INTEGER NOT NULL,
85 completed_at INTEGER,
86 superseded_at INTEGER,
87 source_ref TEXT
88 );
89
90 CREATE TABLE IF NOT EXISTS steps (
91 id TEXT PRIMARY KEY,
92 run_id TEXT NOT NULL,
93 kind TEXT NOT NULL,
94 status TEXT NOT NULL,
95 properties BLOB NOT NULL,
96 created_at INTEGER NOT NULL,
97 completed_at INTEGER,
98 superseded_at INTEGER,
99 source_ref TEXT,
100 FOREIGN KEY(run_id) REFERENCES runs(id)
101 );
102
103 CREATE TABLE IF NOT EXISTS actions (
104 id TEXT PRIMARY KEY,
105 step_id TEXT NOT NULL,
106 kind TEXT NOT NULL,
107 status TEXT NOT NULL,
108 properties BLOB NOT NULL,
109 created_at INTEGER NOT NULL,
110 completed_at INTEGER,
111 superseded_at INTEGER,
112 source_ref TEXT,
113 FOREIGN KEY(step_id) REFERENCES steps(id)
114 );
115
116 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117 ON runs(source_ref);
118 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119 ON steps(source_ref);
120 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121 ON actions(source_ref);
122 ",
123 ),
124 Migration::new(
125 SchemaVersion(2),
126 "durable audit trail: provenance_events table",
127 r"
128 CREATE TABLE IF NOT EXISTS provenance_events (
129 id TEXT PRIMARY KEY,
130 event_type TEXT NOT NULL,
131 subject TEXT NOT NULL,
132 source_ref TEXT,
133 created_at INTEGER NOT NULL DEFAULT (unixepoch())
134 );
135 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136 ON provenance_events (subject, event_type);
137 ",
138 ),
139 Migration::new(
140 SchemaVersion(3),
141 "vector regeneration contracts",
142 r"
143 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144 profile TEXT PRIMARY KEY,
145 table_name TEXT NOT NULL,
146 model_identity TEXT NOT NULL,
147 model_version TEXT NOT NULL,
148 dimension INTEGER NOT NULL,
149 normalization_policy TEXT NOT NULL,
150 chunking_policy TEXT NOT NULL,
151 preprocessing_policy TEXT NOT NULL,
152 generator_command_json TEXT NOT NULL,
153 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
154 );
155 ",
156 ),
157 Migration::new(
158 SchemaVersion(4),
159 "vector regeneration apply metadata",
160 r"
161 ALTER TABLE vector_embedding_contracts
162 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163 ALTER TABLE vector_embedding_contracts
164 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165 UPDATE vector_embedding_contracts
166 SET
167 applied_at = CASE
168 WHEN applied_at = 0 THEN updated_at
169 ELSE applied_at
170 END,
171 snapshot_hash = CASE
172 WHEN snapshot_hash = '' THEN 'legacy'
173 ELSE snapshot_hash
174 END;
175 ",
176 ),
177 Migration::new(
178 SchemaVersion(5),
179 "vector regeneration contract format version",
180 r"
181 ALTER TABLE vector_embedding_contracts
182 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183 UPDATE vector_embedding_contracts
184 SET contract_format_version = 1
185 WHERE contract_format_version = 0;
186 ",
187 ),
188 Migration::new(
189 SchemaVersion(6),
190 "provenance metadata payloads",
191 r"
192 ALTER TABLE provenance_events
193 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194 ",
195 ),
196 Migration::new(
197 SchemaVersion(7),
198 "operational store canonical and derived tables",
199 r"
200 CREATE TABLE IF NOT EXISTS operational_collections (
201 name TEXT PRIMARY KEY,
202 kind TEXT NOT NULL,
203 schema_json TEXT NOT NULL,
204 retention_json TEXT NOT NULL,
205 format_version INTEGER NOT NULL DEFAULT 1,
206 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
207 disabled_at INTEGER
208 );
209
210 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211 ON operational_collections(kind, disabled_at);
212
213 CREATE TABLE IF NOT EXISTS operational_mutations (
214 id TEXT PRIMARY KEY,
215 collection_name TEXT NOT NULL,
216 record_key TEXT NOT NULL,
217 op_kind TEXT NOT NULL,
218 payload_json TEXT NOT NULL,
219 source_ref TEXT,
220 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
221 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222 );
223
224 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227 ON operational_mutations(source_ref);
228
229 CREATE TABLE IF NOT EXISTS operational_current (
230 collection_name TEXT NOT NULL,
231 record_key TEXT NOT NULL,
232 payload_json TEXT NOT NULL,
233 updated_at INTEGER NOT NULL,
234 last_mutation_id TEXT NOT NULL,
235 PRIMARY KEY(collection_name, record_key),
236 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238 );
239
240 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241 ON operational_current(collection_name, updated_at DESC);
242 ",
243 ),
244 Migration::new(
245 SchemaVersion(8),
246 "operational mutation ordering hardening",
247 r"
248 ALTER TABLE operational_mutations
249 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250 UPDATE operational_mutations
251 SET mutation_order = rowid
252 WHERE mutation_order = 0;
253 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254 ON operational_mutations(collection_name, record_key, mutation_order DESC);
255 ",
256 ),
257 Migration::new(
258 SchemaVersion(9),
259 "node last_accessed metadata",
260 r"
261 CREATE TABLE IF NOT EXISTS node_access_metadata (
262 logical_id TEXT PRIMARY KEY,
263 last_accessed_at INTEGER NOT NULL,
264 updated_at INTEGER NOT NULL
265 );
266
267 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268 ON node_access_metadata(last_accessed_at DESC);
269 ",
270 ),
271 Migration::new(
272 SchemaVersion(10),
273 "operational filtered read contracts and extracted values",
274 r"
275 ALTER TABLE operational_collections
276 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278 CREATE TABLE IF NOT EXISTS operational_filter_values (
279 mutation_id TEXT NOT NULL,
280 collection_name TEXT NOT NULL,
281 field_name TEXT NOT NULL,
282 string_value TEXT,
283 integer_value INTEGER,
284 PRIMARY KEY(mutation_id, field_name),
285 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287 );
288
289 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293 ",
294 ),
295 Migration::new(
296 SchemaVersion(11),
297 "operational payload validation contracts",
298 r"
299 ALTER TABLE operational_collections
300 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301 ",
302 ),
303 Migration::new(
304 SchemaVersion(12),
305 "operational secondary indexes",
306 r"
307 ALTER TABLE operational_collections
308 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311 collection_name TEXT NOT NULL,
312 index_name TEXT NOT NULL,
313 subject_kind TEXT NOT NULL,
314 mutation_id TEXT NOT NULL DEFAULT '',
315 record_key TEXT NOT NULL DEFAULT '',
316 sort_timestamp INTEGER,
317 slot1_text TEXT,
318 slot1_integer INTEGER,
319 slot2_text TEXT,
320 slot2_integer INTEGER,
321 slot3_text TEXT,
322 slot3_integer INTEGER,
323 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326 );
327
328 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329 ON operational_secondary_index_entries(
330 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331 );
332 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333 ON operational_secondary_index_entries(
334 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335 );
336 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337 ON operational_secondary_index_entries(
338 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339 );
340 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341 ON operational_secondary_index_entries(
342 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343 );
344 ",
345 ),
346 Migration::new(
347 SchemaVersion(13),
348 "operational retention run metadata",
349 r"
350 CREATE TABLE IF NOT EXISTS operational_retention_runs (
351 id TEXT PRIMARY KEY,
352 collection_name TEXT NOT NULL,
353 executed_at INTEGER NOT NULL,
354 action_kind TEXT NOT NULL,
355 dry_run INTEGER NOT NULL DEFAULT 0,
356 deleted_mutations INTEGER NOT NULL,
357 rows_remaining INTEGER NOT NULL,
358 metadata_json TEXT NOT NULL DEFAULT '',
359 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360 );
361
362 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363 ON operational_retention_runs(collection_name, executed_at DESC);
364 ",
365 ),
366 Migration::new(
367 SchemaVersion(14),
368 "external content object columns",
369 r"
370 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373 ON nodes(content_ref)
374 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377 ",
378 ),
379 Migration::new(
380 SchemaVersion(15),
381 "FTS property projection schemas",
382 r"
383 CREATE TABLE IF NOT EXISTS fts_property_schemas (
384 kind TEXT PRIMARY KEY,
385 property_paths_json TEXT NOT NULL,
386 separator TEXT NOT NULL DEFAULT ' ',
387 format_version INTEGER NOT NULL DEFAULT 1,
388 created_at INTEGER NOT NULL DEFAULT (unixepoch())
389 );
390
391 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392 node_logical_id UNINDEXED,
393 kind UNINDEXED,
394 text_content
395 );
396 ",
397 ),
398 Migration::new(
399 SchemaVersion(16),
400 "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401 "",
407 ),
408 Migration::new(
409 SchemaVersion(17),
410 "fts property position-map sidecar for recursive extraction",
411 r"
426 CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427 node_logical_id TEXT NOT NULL,
428 kind TEXT NOT NULL,
429 start_offset INTEGER NOT NULL,
430 end_offset INTEGER NOT NULL,
431 leaf_path TEXT NOT NULL
432 );
433
434 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435 ON fts_node_property_positions(node_logical_id, kind);
436
437 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438 ON fts_node_property_positions(kind);
439 ",
440 ),
441 Migration::new(
442 SchemaVersion(18),
443 "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444 r"
457 DROP TABLE IF EXISTS fts_node_property_positions;
458
459 CREATE TABLE fts_node_property_positions (
460 node_logical_id TEXT NOT NULL,
461 kind TEXT NOT NULL,
462 start_offset INTEGER NOT NULL,
463 end_offset INTEGER NOT NULL,
464 leaf_path TEXT NOT NULL,
465 UNIQUE(node_logical_id, kind, start_offset)
466 );
467
468 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469 ON fts_node_property_positions(node_logical_id, kind);
470
471 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472 ON fts_node_property_positions(kind);
473 ",
474 ),
475 Migration::new(
476 SchemaVersion(19),
477 "async property-FTS rebuild staging and state tables",
478 r"
479 CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480 kind TEXT NOT NULL,
481 node_logical_id TEXT NOT NULL,
482 text_content TEXT NOT NULL,
483 positions_blob BLOB,
484 PRIMARY KEY (kind, node_logical_id)
485 );
486
487 CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488 kind TEXT PRIMARY KEY,
489 schema_id INTEGER NOT NULL,
490 state TEXT NOT NULL,
491 rows_total INTEGER,
492 rows_done INTEGER NOT NULL DEFAULT 0,
493 started_at INTEGER NOT NULL,
494 last_progress_at INTEGER,
495 error_message TEXT,
496 is_first_registration INTEGER NOT NULL DEFAULT 0
497 );
498 ",
499 ),
500 Migration::new(
501 SchemaVersion(20),
502 "projection_profiles table for per-kind FTS tokenizer configuration",
503 r"CREATE TABLE IF NOT EXISTS projection_profiles (
504 kind TEXT NOT NULL,
505 facet TEXT NOT NULL,
506 config_json TEXT NOT NULL,
507 active_at INTEGER,
508 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
509 PRIMARY KEY (kind, facet)
510 );",
511 ),
512 Migration::new(
513 SchemaVersion(21),
514 "per-kind FTS5 tables replacing fts_node_properties",
515 "",
516 ),
517 Migration::new(
518 SchemaVersion(22),
519 "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521 ),
522 Migration::new(
523 SchemaVersion(23),
524 "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525 "DROP TABLE IF EXISTS fts_node_properties;",
526 ),
527];
528
529#[derive(Clone, Debug, PartialEq, Eq)]
530pub struct BootstrapReport {
531 pub sqlite_version: String,
532 pub applied_versions: Vec<SchemaVersion>,
533 pub vector_profile_enabled: bool,
534}
535
536#[derive(Clone, Debug, Default)]
537pub struct SchemaManager;
538
539impl SchemaManager {
540 #[must_use]
541 pub fn new() -> Self {
542 Self
543 }
544
545 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
553 self.initialize_connection(conn)?;
554 Self::ensure_metadata_tables(conn)?;
555
556 let max_applied: u32 = conn.query_row(
558 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
559 [],
560 |row| row.get(0),
561 )?;
562 let engine_version = self.current_version().0;
563 trace_info!(
564 current_version = max_applied,
565 engine_version,
566 "schema bootstrap: version check"
567 );
568 if max_applied > engine_version {
569 trace_error!(
570 database_version = max_applied,
571 engine_version,
572 "schema version mismatch: database is newer than engine"
573 );
574 return Err(SchemaError::VersionMismatch {
575 database_version: max_applied,
576 engine_version,
577 });
578 }
579
580 let mut applied_versions = Vec::new();
581 for migration in self.migrations() {
582 let already_applied = conn
583 .query_row(
584 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
585 [i64::from(migration.version.0)],
586 |row| row.get::<_, i64>(0),
587 )
588 .optional()?
589 .is_some();
590
591 if already_applied {
592 continue;
593 }
594
595 let tx = conn.unchecked_transaction()?;
596 match migration.version {
597 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
598 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
599 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
600 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
601 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
602 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
603 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
604 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
605 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
606 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
607 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
608 SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
609 SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
610 SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
611 _ => tx.execute_batch(migration.sql)?,
612 }
613 tx.execute(
614 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
615 (i64::from(migration.version.0), migration.description),
616 )?;
617 tx.commit()?;
618 trace_info!(
619 version = migration.version.0,
620 description = migration.description,
621 "schema migration applied"
622 );
623 applied_versions.push(migration.version);
624 }
625
626 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
627 let vector_profile_count: i64 = conn.query_row(
628 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
629 [],
630 |row| row.get(0),
631 )?;
632 Ok(BootstrapReport {
633 sqlite_version,
634 applied_versions,
635 vector_profile_enabled: vector_profile_count > 0,
636 })
637 }
638
639 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
640 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
641 let columns = stmt
642 .query_map([], |row| row.get::<_, String>(1))?
643 .collect::<Result<Vec<_>, _>>()?;
644 let has_applied_at = columns.iter().any(|column| column == "applied_at");
645 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
646
647 if !has_applied_at {
648 conn.execute(
649 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
650 [],
651 )?;
652 }
653 if !has_snapshot_hash {
654 conn.execute(
655 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
656 [],
657 )?;
658 }
659 conn.execute(
660 r"
661 UPDATE vector_embedding_contracts
662 SET
663 applied_at = CASE
664 WHEN applied_at = 0 THEN updated_at
665 ELSE applied_at
666 END,
667 snapshot_hash = CASE
668 WHEN snapshot_hash = '' THEN 'legacy'
669 ELSE snapshot_hash
670 END
671 ",
672 [],
673 )?;
674 Ok(())
675 }
676
677 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
678 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
679 let columns = stmt
680 .query_map([], |row| row.get::<_, String>(1))?
681 .collect::<Result<Vec<_>, _>>()?;
682 let has_contract_format_version = columns
683 .iter()
684 .any(|column| column == "contract_format_version");
685
686 if !has_contract_format_version {
687 conn.execute(
688 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
689 [],
690 )?;
691 }
692 conn.execute(
693 r"
694 UPDATE vector_embedding_contracts
695 SET contract_format_version = 1
696 WHERE contract_format_version = 0
697 ",
698 [],
699 )?;
700 Ok(())
701 }
702
703 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
704 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
705 let columns = stmt
706 .query_map([], |row| row.get::<_, String>(1))?
707 .collect::<Result<Vec<_>, _>>()?;
708 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
709
710 if !has_metadata_json {
711 conn.execute(
712 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
713 [],
714 )?;
715 }
716 Ok(())
717 }
718
719 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
720 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
721 let columns = stmt
722 .query_map([], |row| row.get::<_, String>(1))?
723 .collect::<Result<Vec<_>, _>>()?;
724 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
725
726 if !has_mutation_order {
727 conn.execute(
728 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
729 [],
730 )?;
731 }
732 conn.execute(
733 r"
734 UPDATE operational_mutations
735 SET mutation_order = rowid
736 WHERE mutation_order = 0
737 ",
738 [],
739 )?;
740 conn.execute(
741 r"
742 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
743 ON operational_mutations(collection_name, record_key, mutation_order DESC)
744 ",
745 [],
746 )?;
747 Ok(())
748 }
749
750 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
751 conn.execute_batch(
752 r"
753 CREATE TABLE IF NOT EXISTS node_access_metadata (
754 logical_id TEXT PRIMARY KEY,
755 last_accessed_at INTEGER NOT NULL,
756 updated_at INTEGER NOT NULL
757 );
758
759 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
760 ON node_access_metadata(last_accessed_at DESC);
761 ",
762 )?;
763 Ok(())
764 }
765
766 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
767 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
768 let columns = stmt
769 .query_map([], |row| row.get::<_, String>(1))?
770 .collect::<Result<Vec<_>, _>>()?;
771 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
772
773 if !has_filter_fields_json {
774 conn.execute(
775 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
776 [],
777 )?;
778 }
779
780 conn.execute_batch(
781 r"
782 CREATE TABLE IF NOT EXISTS operational_filter_values (
783 mutation_id TEXT NOT NULL,
784 collection_name TEXT NOT NULL,
785 field_name TEXT NOT NULL,
786 string_value TEXT,
787 integer_value INTEGER,
788 PRIMARY KEY(mutation_id, field_name),
789 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
790 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
791 );
792
793 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
794 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
795 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
796 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
797 ",
798 )?;
799 Ok(())
800 }
801
802 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
803 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
804 let columns = stmt
805 .query_map([], |row| row.get::<_, String>(1))?
806 .collect::<Result<Vec<_>, _>>()?;
807 let has_validation_json = columns.iter().any(|column| column == "validation_json");
808
809 if !has_validation_json {
810 conn.execute(
811 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
812 [],
813 )?;
814 }
815
816 Ok(())
817 }
818
819 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
820 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
821 let columns = stmt
822 .query_map([], |row| row.get::<_, String>(1))?
823 .collect::<Result<Vec<_>, _>>()?;
824 let has_secondary_indexes_json = columns
825 .iter()
826 .any(|column| column == "secondary_indexes_json");
827
828 if !has_secondary_indexes_json {
829 conn.execute(
830 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
831 [],
832 )?;
833 }
834
835 conn.execute_batch(
836 r"
837 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
838 collection_name TEXT NOT NULL,
839 index_name TEXT NOT NULL,
840 subject_kind TEXT NOT NULL,
841 mutation_id TEXT NOT NULL DEFAULT '',
842 record_key TEXT NOT NULL DEFAULT '',
843 sort_timestamp INTEGER,
844 slot1_text TEXT,
845 slot1_integer INTEGER,
846 slot2_text TEXT,
847 slot2_integer INTEGER,
848 slot3_text TEXT,
849 slot3_integer INTEGER,
850 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
851 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
852 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
853 );
854
855 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
856 ON operational_secondary_index_entries(
857 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
858 );
859 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
860 ON operational_secondary_index_entries(
861 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
862 );
863 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
864 ON operational_secondary_index_entries(
865 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
866 );
867 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
868 ON operational_secondary_index_entries(
869 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
870 );
871 ",
872 )?;
873
874 Ok(())
875 }
876
877 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
878 conn.execute_batch(
879 r"
880 CREATE TABLE IF NOT EXISTS operational_retention_runs (
881 id TEXT PRIMARY KEY,
882 collection_name TEXT NOT NULL,
883 executed_at INTEGER NOT NULL,
884 action_kind TEXT NOT NULL,
885 dry_run INTEGER NOT NULL DEFAULT 0,
886 deleted_mutations INTEGER NOT NULL,
887 rows_remaining INTEGER NOT NULL,
888 metadata_json TEXT NOT NULL DEFAULT '',
889 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
890 );
891
892 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
893 ON operational_retention_runs(collection_name, executed_at DESC);
894 ",
895 )?;
896 Ok(())
897 }
898
899 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
900 let node_columns = Self::column_names(conn, "nodes")?;
901 if !node_columns.iter().any(|c| c == "content_ref") {
902 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
903 }
904 conn.execute_batch(
905 r"
906 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
907 ON nodes(content_ref)
908 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
909 ",
910 )?;
911
912 let chunk_columns = Self::column_names(conn, "chunks")?;
913 if !chunk_columns.iter().any(|c| c == "content_hash") {
914 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
915 }
916 Ok(())
917 }
918
919 fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
937 conn.execute_batch(
938 r"
939 DROP TABLE IF EXISTS fts_nodes;
940 CREATE VIRTUAL TABLE fts_nodes USING fts5(
941 chunk_id UNINDEXED,
942 node_logical_id UNINDEXED,
943 kind UNINDEXED,
944 text_content,
945 tokenize = 'porter unicode61 remove_diacritics 2'
946 );
947
948 DROP TABLE IF EXISTS fts_node_properties;
949 CREATE VIRTUAL TABLE fts_node_properties USING fts5(
950 node_logical_id UNINDEXED,
951 kind UNINDEXED,
952 text_content,
953 tokenize = 'porter unicode61 remove_diacritics 2'
954 );
955
956 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
957 SELECT c.id, n.logical_id, n.kind, c.text_content
958 FROM chunks c
959 JOIN nodes n
960 ON n.logical_id = c.node_logical_id
961 AND n.superseded_at IS NULL;
962 ",
963 )?;
964 Ok(())
965 }
966
967 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
968 conn.execute_batch(
969 r"
970 CREATE TABLE IF NOT EXISTS fts_property_schemas (
971 kind TEXT PRIMARY KEY,
972 property_paths_json TEXT NOT NULL,
973 separator TEXT NOT NULL DEFAULT ' ',
974 format_version INTEGER NOT NULL DEFAULT 1,
975 created_at INTEGER NOT NULL DEFAULT (unixepoch())
976 );
977
978 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
979 node_logical_id UNINDEXED,
980 kind UNINDEXED,
981 text_content
982 );
983 ",
984 )?;
985 Ok(())
986 }
987
988 fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
989 let kinds: Vec<String> = {
991 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
992 stmt.query_map([], |r| r.get::<_, String>(0))?
993 .collect::<Result<Vec<_>, _>>()?
994 };
995
996 for kind in &kinds {
997 let table_name = fts_kind_table_name(kind);
998 let ddl = format!(
999 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1000 node_logical_id UNINDEXED, \
1001 text_content, \
1002 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1003 )"
1004 );
1005 conn.execute_batch(&ddl)?;
1006
1007 conn.execute(
1009 "INSERT OR IGNORE INTO fts_property_rebuild_state \
1010 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1011 SELECT ?1, rowid, 'PENDING', 0, unixepoch('now') * 1000, 0 \
1012 FROM fts_property_schemas WHERE kind = ?1",
1013 rusqlite::params![kind],
1014 )?;
1015 }
1016
1017 Ok(())
1020 }
1021
1022 fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1023 let column_exists: bool = {
1025 let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1026 let names: Vec<String> = stmt
1027 .query_map([], |r| r.get::<_, String>(1))?
1028 .collect::<Result<Vec<_>, _>>()?;
1029 names.iter().any(|n| n == "columns_json")
1030 };
1031
1032 if !column_exists {
1033 conn.execute_batch(
1034 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1035 )?;
1036 }
1037
1038 Ok(())
1039 }
1040
1041 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1042 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1043 let names = stmt
1044 .query_map([], |row| row.get::<_, String>(1))?
1045 .collect::<Result<Vec<_>, _>>()?;
1046 Ok(names)
1047 }
1048
1049 #[must_use]
1050 pub fn current_version(&self) -> SchemaVersion {
1051 self.migrations()
1052 .last()
1053 .map_or(SchemaVersion(0), |migration| migration.version)
1054 }
1055
1056 #[must_use]
1057 pub fn migrations(&self) -> &'static [Migration] {
1058 MIGRATIONS
1059 }
1060
1061 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1067 conn.execute_batch(
1068 r"
1069 PRAGMA foreign_keys = ON;
1070 PRAGMA journal_mode = WAL;
1071 PRAGMA synchronous = NORMAL;
1072 PRAGMA busy_timeout = 5000;
1073 PRAGMA temp_store = MEMORY;
1074 PRAGMA mmap_size = 3000000000;
1075 PRAGMA journal_size_limit = 536870912;
1076 ",
1077 )?;
1078 Ok(())
1079 }
1080
1081 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1092 conn.execute_batch(
1093 r"
1094 PRAGMA foreign_keys = ON;
1095 PRAGMA busy_timeout = 5000;
1096 PRAGMA temp_store = MEMORY;
1097 PRAGMA mmap_size = 3000000000;
1098 ",
1099 )?;
1100 Ok(())
1101 }
1102
1103 #[cfg(feature = "sqlite-vec")]
1115 pub fn ensure_vector_profile(
1116 &self,
1117 conn: &Connection,
1118 profile: &str,
1119 table_name: &str,
1120 dimension: usize,
1121 ) -> Result<(), SchemaError> {
1122 conn.execute_batch(&format!(
1123 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1124 chunk_id TEXT PRIMARY KEY,\
1125 embedding float[{dimension}]\
1126 )"
1127 ))?;
1128 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1131 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1132 format!("vector dimension {dimension} does not fit in i64").into(),
1133 ))
1134 })?;
1135 conn.execute(
1136 "INSERT OR REPLACE INTO vector_profiles \
1137 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1138 rusqlite::params![profile, table_name, dimension_i64],
1139 )?;
1140 Ok(())
1141 }
1142
1143 #[cfg(not(feature = "sqlite-vec"))]
1148 pub fn ensure_vector_profile(
1149 &self,
1150 _conn: &Connection,
1151 _profile: &str,
1152 _table_name: &str,
1153 _dimension: usize,
1154 ) -> Result<(), SchemaError> {
1155 Err(SchemaError::MissingCapability("sqlite-vec"))
1156 }
1157
1158 #[cfg(feature = "sqlite-vec")]
1170 pub fn ensure_vec_kind_profile(
1171 &self,
1172 conn: &Connection,
1173 kind: &str,
1174 dimension: usize,
1175 ) -> Result<(), SchemaError> {
1176 let table_name = vec_kind_table_name(kind);
1177 conn.execute_batch(&format!(
1178 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1179 chunk_id TEXT PRIMARY KEY,\
1180 embedding float[{dimension}]\
1181 )"
1182 ))?;
1183 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1184 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1185 format!("vector dimension {dimension} does not fit in i64").into(),
1186 ))
1187 })?;
1188 conn.execute(
1190 "INSERT OR REPLACE INTO vector_profiles \
1191 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1192 rusqlite::params![kind, table_name, dimension_i64],
1193 )?;
1194 let config_json =
1197 format!(r#"{{"table_name":"{table_name}","dimensions":{dimension_i64}}}"#);
1198 conn.execute(
1199 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
1200 VALUES (?1, 'vec', ?2, unixepoch(), unixepoch()) \
1201 ON CONFLICT(kind, facet) DO UPDATE SET \
1202 config_json = ?2, \
1203 active_at = unixepoch()",
1204 rusqlite::params![kind, config_json],
1205 )?;
1206 Ok(())
1207 }
1208
1209 #[cfg(not(feature = "sqlite-vec"))]
1214 pub fn ensure_vec_kind_profile(
1215 &self,
1216 _conn: &Connection,
1217 _kind: &str,
1218 _dimension: usize,
1219 ) -> Result<(), SchemaError> {
1220 Err(SchemaError::MissingCapability("sqlite-vec"))
1221 }
1222
1223 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1229 conn.execute_batch(
1230 r"
1231 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1232 version INTEGER PRIMARY KEY,
1233 description TEXT NOT NULL,
1234 applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1235 );
1236 ",
1237 )?;
1238 Ok(())
1239 }
1240}
1241
1242pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1244
1245#[must_use]
1255pub fn fts_kind_table_name(kind: &str) -> String {
1256 let lowered = kind.to_lowercase();
1258 let mut slug = String::with_capacity(lowered.len());
1259 let mut prev_was_underscore = false;
1260 for ch in lowered.chars() {
1261 if ch.is_ascii_alphanumeric() {
1262 slug.push(ch);
1263 prev_was_underscore = false;
1264 } else {
1265 if !prev_was_underscore {
1266 slug.push('_');
1267 }
1268 prev_was_underscore = true;
1269 }
1270 }
1271
1272 let prefixed = format!("fts_props_{slug}");
1274
1275 if prefixed.len() <= 63 {
1277 prefixed
1278 } else {
1279 let hash = Sha256::digest(kind.as_bytes());
1281 let mut hex = String::with_capacity(hash.len() * 2);
1282 for b in &hash {
1283 use std::fmt::Write as _;
1284 let _ = write!(hex, "{b:02x}");
1285 }
1286 let hex_suffix = &hex[..7];
1287 let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1290 format!("fts_props_{slug_truncated}_{hex_suffix}")
1291 }
1292}
1293
1294#[must_use]
1302pub fn vec_kind_table_name(kind: &str) -> String {
1303 let lowered = kind.to_lowercase();
1304 let mut slug = String::with_capacity(lowered.len());
1305 let mut prev_was_underscore = false;
1306 for ch in lowered.chars() {
1307 if ch.is_ascii_alphanumeric() {
1308 slug.push(ch);
1309 prev_was_underscore = false;
1310 } else {
1311 if !prev_was_underscore {
1312 slug.push('_');
1313 }
1314 prev_was_underscore = true;
1315 }
1316 }
1317 format!("vec_{slug}")
1318}
1319
1320#[must_use]
1328pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1329 let stripped = if let Some(rest) = path.strip_prefix("$.") {
1331 rest
1332 } else if let Some(rest) = path.strip_prefix('$') {
1333 rest
1334 } else {
1335 path
1336 };
1337
1338 let lowered = stripped.to_lowercase();
1340 let mut col = String::with_capacity(lowered.len());
1341 let mut prev_was_underscore = false;
1342 for ch in lowered.chars() {
1343 if ch.is_ascii_alphanumeric() || ch == '_' {
1344 col.push(ch);
1345 prev_was_underscore = ch == '_';
1346 } else {
1347 if !prev_was_underscore {
1348 col.push('_');
1349 }
1350 prev_was_underscore = true;
1351 }
1352 }
1353
1354 let col = col.trim_end_matches('_').to_owned();
1356
1357 if is_recursive {
1359 format!("{col}_all")
1360 } else {
1361 col
1362 }
1363}
1364
1365pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1376 let result = conn
1377 .query_row(
1378 "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1379 [kind],
1380 |row| row.get::<_, Option<String>>(0),
1381 )
1382 .optional();
1383
1384 match result {
1385 Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1386 Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1387 Err(rusqlite::Error::SqliteFailure(_, _)) => {
1388 Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1390 }
1391 Err(e) => Err(SchemaError::Sqlite(e)),
1392 }
1393}
1394
1395#[cfg(test)]
1396#[allow(clippy::expect_used)]
1397mod tests {
1398 use rusqlite::Connection;
1399
1400 use super::SchemaManager;
1401
1402 #[test]
1403 fn bootstrap_applies_initial_schema() {
1404 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1405 let manager = SchemaManager::new();
1406
1407 let report = manager.bootstrap(&conn).expect("bootstrap report");
1408
1409 assert_eq!(
1410 report.applied_versions.len(),
1411 manager.current_version().0 as usize
1412 );
1413 assert!(report.sqlite_version.starts_with('3'));
1414 let table_count: i64 = conn
1415 .query_row(
1416 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1417 [],
1418 |row| row.get(0),
1419 )
1420 .expect("nodes table exists");
1421 assert_eq!(table_count, 1);
1422 }
1423
1424 #[test]
1427 fn vector_profile_not_enabled_without_feature() {
1428 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1429 let manager = SchemaManager::new();
1430 let report = manager.bootstrap(&conn).expect("bootstrap");
1431 assert!(
1432 !report.vector_profile_enabled,
1433 "vector_profile_enabled must be false on a fresh bootstrap"
1434 );
1435 }
1436
1437 #[test]
1438 fn vector_profile_skipped_when_dimension_absent() {
1439 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1441 let manager = SchemaManager::new();
1442 manager.bootstrap(&conn).expect("bootstrap");
1443
1444 let count: i64 = conn
1445 .query_row(
1446 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1447 [],
1448 |row| row.get(0),
1449 )
1450 .expect("count");
1451 assert_eq!(
1452 count, 0,
1453 "no enabled profile without calling ensure_vector_profile"
1454 );
1455 }
1456
1457 #[test]
1458 fn bootstrap_report_reflects_actual_vector_state() {
1459 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1461 let manager = SchemaManager::new();
1462 let report = manager.bootstrap(&conn).expect("bootstrap");
1463
1464 let db_count: i64 = conn
1465 .query_row(
1466 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1467 [],
1468 |row| row.get(0),
1469 )
1470 .expect("count");
1471 assert_eq!(
1472 report.vector_profile_enabled,
1473 db_count > 0,
1474 "BootstrapReport.vector_profile_enabled must match actual DB state"
1475 );
1476 }
1477
1478 #[test]
1479 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1480 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1481 conn.execute_batch(
1482 r#"
1483 CREATE TABLE provenance_events (
1484 id TEXT PRIMARY KEY,
1485 event_type TEXT NOT NULL,
1486 subject TEXT NOT NULL,
1487 source_ref TEXT,
1488 created_at INTEGER NOT NULL DEFAULT (unixepoch())
1489 );
1490 CREATE TABLE vector_embedding_contracts (
1491 profile TEXT PRIMARY KEY,
1492 table_name TEXT NOT NULL,
1493 model_identity TEXT NOT NULL,
1494 model_version TEXT NOT NULL,
1495 dimension INTEGER NOT NULL,
1496 normalization_policy TEXT NOT NULL,
1497 chunking_policy TEXT NOT NULL,
1498 preprocessing_policy TEXT NOT NULL,
1499 generator_command_json TEXT NOT NULL,
1500 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1501 applied_at INTEGER NOT NULL DEFAULT 0,
1502 snapshot_hash TEXT NOT NULL DEFAULT ''
1503 );
1504 INSERT INTO vector_embedding_contracts (
1505 profile,
1506 table_name,
1507 model_identity,
1508 model_version,
1509 dimension,
1510 normalization_policy,
1511 chunking_policy,
1512 preprocessing_policy,
1513 generator_command_json,
1514 updated_at,
1515 applied_at,
1516 snapshot_hash
1517 ) VALUES (
1518 'default',
1519 'vec_nodes_active',
1520 'legacy-model',
1521 '0.9.0',
1522 4,
1523 'l2',
1524 'per_chunk',
1525 'trim',
1526 '["/bin/echo"]',
1527 100,
1528 100,
1529 'legacy'
1530 );
1531 "#,
1532 )
1533 .expect("seed legacy schema");
1534 let manager = SchemaManager::new();
1535
1536 let report = manager.bootstrap(&conn).expect("bootstrap");
1537
1538 assert!(
1539 report.applied_versions.iter().any(|version| version.0 >= 5),
1540 "bootstrap should apply hardening migrations"
1541 );
1542 let format_version: i64 = conn
1543 .query_row(
1544 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1545 [],
1546 |row| row.get(0),
1547 )
1548 .expect("contract_format_version");
1549 assert_eq!(format_version, 1);
1550 let metadata_column_count: i64 = conn
1551 .query_row(
1552 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1553 [],
1554 |row| row.get(0),
1555 )
1556 .expect("metadata_json column count");
1557 assert_eq!(metadata_column_count, 1);
1558 }
1559
1560 #[test]
1561 fn bootstrap_creates_operational_store_tables() {
1562 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1563 let manager = SchemaManager::new();
1564
1565 manager.bootstrap(&conn).expect("bootstrap");
1566
1567 for table in [
1568 "operational_collections",
1569 "operational_mutations",
1570 "operational_current",
1571 ] {
1572 let count: i64 = conn
1573 .query_row(
1574 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1575 [table],
1576 |row| row.get(0),
1577 )
1578 .expect("table existence");
1579 assert_eq!(count, 1, "{table} should exist after bootstrap");
1580 }
1581 let mutation_order_columns: i64 = conn
1582 .query_row(
1583 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1584 [],
1585 |row| row.get(0),
1586 )
1587 .expect("mutation_order column exists");
1588 assert_eq!(mutation_order_columns, 1);
1589 }
1590
1591 #[test]
1592 fn bootstrap_is_idempotent_with_operational_store_tables() {
1593 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1594 let manager = SchemaManager::new();
1595
1596 manager.bootstrap(&conn).expect("first bootstrap");
1597 let report = manager.bootstrap(&conn).expect("second bootstrap");
1598
1599 assert!(
1600 report.applied_versions.is_empty(),
1601 "second bootstrap should apply no new migrations"
1602 );
1603 let count: i64 = conn
1604 .query_row(
1605 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1606 [],
1607 |row| row.get(0),
1608 )
1609 .expect("operational_collections table exists");
1610 assert_eq!(count, 1);
1611 }
1612
1613 #[test]
1614 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1615 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1616 conn.execute_batch(
1617 r#"
1618 CREATE TABLE operational_collections (
1619 name TEXT PRIMARY KEY,
1620 kind TEXT NOT NULL,
1621 schema_json TEXT NOT NULL,
1622 retention_json TEXT NOT NULL,
1623 format_version INTEGER NOT NULL DEFAULT 1,
1624 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1625 disabled_at INTEGER
1626 );
1627
1628 CREATE TABLE operational_mutations (
1629 id TEXT PRIMARY KEY,
1630 collection_name TEXT NOT NULL,
1631 record_key TEXT NOT NULL,
1632 op_kind TEXT NOT NULL,
1633 payload_json TEXT NOT NULL,
1634 source_ref TEXT,
1635 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1636 mutation_order INTEGER NOT NULL DEFAULT 0,
1637 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1638 );
1639
1640 CREATE TABLE operational_current (
1641 collection_name TEXT NOT NULL,
1642 record_key TEXT NOT NULL,
1643 payload_json TEXT NOT NULL,
1644 updated_at INTEGER NOT NULL,
1645 last_mutation_id TEXT NOT NULL,
1646 PRIMARY KEY(collection_name, record_key),
1647 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1648 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1649 );
1650
1651 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1652 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1653 INSERT INTO operational_mutations (
1654 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1655 ) VALUES (
1656 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1657 );
1658 "#,
1659 )
1660 .expect("seed recovered operational tables");
1661
1662 let manager = SchemaManager::new();
1663 let report = manager
1664 .bootstrap(&conn)
1665 .expect("bootstrap recovered schema");
1666
1667 assert!(
1668 report.applied_versions.iter().any(|version| version.0 == 8),
1669 "bootstrap should record operational mutation ordering hardening"
1670 );
1671 let mutation_order: i64 = conn
1672 .query_row(
1673 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1674 [],
1675 |row| row.get(0),
1676 )
1677 .expect("mutation_order");
1678 assert_ne!(
1679 mutation_order, 0,
1680 "bootstrap should backfill recovered operational rows"
1681 );
1682 let count: i64 = conn
1683 .query_row(
1684 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1685 [],
1686 |row| row.get(0),
1687 )
1688 .expect("ordering index exists");
1689 assert_eq!(count, 1);
1690 }
1691
1692 #[test]
1693 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1694 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1695 conn.execute_batch(
1696 r#"
1697 CREATE TABLE operational_collections (
1698 name TEXT PRIMARY KEY,
1699 kind TEXT NOT NULL,
1700 schema_json TEXT NOT NULL,
1701 retention_json TEXT NOT NULL,
1702 format_version INTEGER NOT NULL DEFAULT 1,
1703 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1704 disabled_at INTEGER
1705 );
1706
1707 CREATE TABLE operational_mutations (
1708 id TEXT PRIMARY KEY,
1709 collection_name TEXT NOT NULL,
1710 record_key TEXT NOT NULL,
1711 op_kind TEXT NOT NULL,
1712 payload_json TEXT NOT NULL,
1713 source_ref TEXT,
1714 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1715 mutation_order INTEGER NOT NULL DEFAULT 1,
1716 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1717 );
1718
1719 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1720 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1721 "#,
1722 )
1723 .expect("seed recovered operational schema");
1724
1725 let manager = SchemaManager::new();
1726 let report = manager
1727 .bootstrap(&conn)
1728 .expect("bootstrap recovered schema");
1729
1730 assert!(
1731 report
1732 .applied_versions
1733 .iter()
1734 .any(|version| version.0 == 10),
1735 "bootstrap should record operational filtered read migration"
1736 );
1737 assert!(
1738 report
1739 .applied_versions
1740 .iter()
1741 .any(|version| version.0 == 11),
1742 "bootstrap should record operational validation migration"
1743 );
1744 let filter_fields_json: String = conn
1745 .query_row(
1746 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1747 [],
1748 |row| row.get(0),
1749 )
1750 .expect("filter_fields_json added");
1751 assert_eq!(filter_fields_json, "[]");
1752 let validation_json: String = conn
1753 .query_row(
1754 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1755 [],
1756 |row| row.get(0),
1757 )
1758 .expect("validation_json added");
1759 assert_eq!(validation_json, "");
1760 let table_count: i64 = conn
1761 .query_row(
1762 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1763 [],
1764 |row| row.get(0),
1765 )
1766 .expect("filter table exists");
1767 assert_eq!(table_count, 1);
1768 }
1769
1770 #[test]
1771 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1772 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1773 let manager = SchemaManager::new();
1774 manager.bootstrap(&conn).expect("initial bootstrap");
1775
1776 conn.execute("DROP TABLE fathom_schema_migrations", [])
1777 .expect("drop migration history");
1778 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1779
1780 let report = manager
1781 .bootstrap(&conn)
1782 .expect("rebootstrap existing schema");
1783
1784 assert!(
1785 report
1786 .applied_versions
1787 .iter()
1788 .any(|version| version.0 == 10),
1789 "rebootstrap should re-record migration 10"
1790 );
1791 assert!(
1792 report
1793 .applied_versions
1794 .iter()
1795 .any(|version| version.0 == 11),
1796 "rebootstrap should re-record migration 11"
1797 );
1798 let filter_fields_json: String = conn
1799 .query_row(
1800 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1801 [],
1802 |row| row.get(0),
1803 )
1804 .unwrap_or_else(|_| "[]".to_string());
1805 assert_eq!(filter_fields_json, "[]");
1806 let validation_json: String = conn
1807 .query_row(
1808 "SELECT validation_json FROM operational_collections LIMIT 1",
1809 [],
1810 |row| row.get(0),
1811 )
1812 .unwrap_or_default();
1813 assert_eq!(validation_json, "");
1814 }
1815
1816 #[test]
1817 fn downgrade_detected_returns_version_mismatch() {
1818 use crate::SchemaError;
1819
1820 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1821 let manager = SchemaManager::new();
1822 manager.bootstrap(&conn).expect("initial bootstrap");
1823
1824 conn.execute(
1825 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1826 (999_i64, "future migration"),
1827 )
1828 .expect("insert future version");
1829
1830 let err = manager
1831 .bootstrap(&conn)
1832 .expect_err("should fail on downgrade");
1833 assert!(
1834 matches!(
1835 err,
1836 SchemaError::VersionMismatch {
1837 database_version: 999,
1838 ..
1839 }
1840 ),
1841 "expected VersionMismatch with database_version 999, got: {err}"
1842 );
1843 }
1844
1845 #[test]
1846 fn journal_size_limit_is_set() {
1847 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1848 let manager = SchemaManager::new();
1849 manager
1850 .initialize_connection(&conn)
1851 .expect("initialize_connection");
1852
1853 let limit: i64 = conn
1854 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1855 .expect("journal_size_limit pragma");
1856 assert_eq!(limit, 536_870_912);
1857 }
1858
1859 #[cfg(feature = "sqlite-vec")]
1860 #[test]
1861 fn vector_profile_created_when_feature_enabled() {
1862 unsafe {
1865 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1866 sqlite_vec::sqlite3_vec_init as *const (),
1867 )));
1868 }
1869 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1870 let manager = SchemaManager::new();
1871 manager.bootstrap(&conn).expect("bootstrap");
1872
1873 manager
1874 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1875 .expect("ensure_vector_profile");
1876
1877 let count: i64 = conn
1878 .query_row(
1879 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1880 [],
1881 |row| row.get(0),
1882 )
1883 .expect("count");
1884 assert_eq!(
1885 count, 1,
1886 "vector profile must be enabled after ensure_vector_profile"
1887 );
1888
1889 let _: i64 = conn
1891 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1892 row.get(0)
1893 })
1894 .expect("vec_nodes_active table must exist after ensure_vector_profile");
1895 }
1896
1897 #[test]
1900 fn fts_kind_table_name_simple_kind() {
1901 assert_eq!(
1902 super::fts_kind_table_name("WMKnowledgeObject"),
1903 "fts_props_wmknowledgeobject"
1904 );
1905 }
1906
1907 #[test]
1908 fn fts_kind_table_name_another_simple_kind() {
1909 assert_eq!(
1910 super::fts_kind_table_name("WMExecutionRecord"),
1911 "fts_props_wmexecutionrecord"
1912 );
1913 }
1914
1915 #[test]
1916 fn fts_kind_table_name_with_separator_chars() {
1917 assert_eq!(
1918 super::fts_kind_table_name("MyKind-With.Dots"),
1919 "fts_props_mykind_with_dots"
1920 );
1921 }
1922
1923 #[test]
1924 fn fts_kind_table_name_collapses_consecutive_underscores() {
1925 assert_eq!(
1926 super::fts_kind_table_name("Kind__Double__Underscores"),
1927 "fts_props_kind_double_underscores"
1928 );
1929 }
1930
1931 #[test]
1932 fn fts_kind_table_name_long_kind_truncates_with_hash() {
1933 let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
1935 let result = super::fts_kind_table_name(long_kind);
1936 assert_eq!(result.len(), 63, "result must be exactly 63 chars");
1937 assert!(
1938 result.starts_with("fts_props_"),
1939 "result must start with fts_props_"
1940 );
1941 let last_underscore = result.rfind('_').expect("must contain underscore");
1943 let hex_suffix = &result[last_underscore + 1..];
1944 assert_eq!(hex_suffix.len(), 7, "hex suffix must be 7 chars");
1945 assert!(
1946 hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
1947 "hex suffix must be hex digits"
1948 );
1949 }
1950
1951 #[test]
1952 fn fts_kind_table_name_testkind() {
1953 assert_eq!(super::fts_kind_table_name("TestKind"), "fts_props_testkind");
1954 }
1955
1956 #[test]
1959 fn fts_column_name_simple_field() {
1960 assert_eq!(super::fts_column_name("$.title", false), "title");
1961 }
1962
1963 #[test]
1964 fn fts_column_name_nested_path() {
1965 assert_eq!(
1966 super::fts_column_name("$.payload.content", false),
1967 "payload_content"
1968 );
1969 }
1970
1971 #[test]
1972 fn fts_column_name_recursive() {
1973 assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
1974 }
1975
1976 #[test]
1977 fn fts_column_name_special_chars() {
1978 assert_eq!(
1979 super::fts_column_name("$.some-field[0]", false),
1980 "some_field_0"
1981 );
1982 }
1983
1984 #[test]
1987 fn resolve_fts_tokenizer_returns_default_when_no_table() {
1988 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1989 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1991 assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
1992 }
1993
1994 #[test]
1995 fn resolve_fts_tokenizer_returns_configured_value() {
1996 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1997 conn.execute_batch(
1998 "CREATE TABLE projection_profiles (
1999 kind TEXT NOT NULL,
2000 facet TEXT NOT NULL,
2001 config_json TEXT NOT NULL,
2002 active_at INTEGER,
2003 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2004 PRIMARY KEY (kind, facet)
2005 );
2006 INSERT INTO projection_profiles (kind, facet, config_json)
2007 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2008 )
2009 .expect("setup table");
2010
2011 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2012 assert_eq!(result, "trigram");
2013
2014 let default_result =
2015 super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
2016 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2017 }
2018
2019 #[test]
2022 fn migration_20_creates_projection_profiles_table() {
2023 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2024 let manager = SchemaManager::new();
2025 manager.bootstrap(&conn).expect("bootstrap");
2026
2027 let table_exists: i64 = conn
2028 .query_row(
2029 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
2030 [],
2031 |row| row.get(0),
2032 )
2033 .expect("query sqlite_master");
2034 assert_eq!(table_exists, 1, "projection_profiles table must exist");
2035
2036 conn.execute_batch(
2038 "INSERT INTO projection_profiles (kind, facet, config_json)
2039 VALUES ('TestKind', 'fts', '{}')",
2040 )
2041 .expect("insert row to verify columns");
2042 let (kind, facet, config_json): (String, String, String) = conn
2043 .query_row(
2044 "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
2045 [],
2046 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2047 )
2048 .expect("select columns");
2049 assert_eq!(kind, "TestKind");
2050 assert_eq!(facet, "fts");
2051 assert_eq!(config_json, "{}");
2052 }
2053
2054 #[test]
2055 fn migration_20_primary_key_is_kind_facet() {
2056 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2057 let manager = SchemaManager::new();
2058 manager.bootstrap(&conn).expect("bootstrap");
2059
2060 conn.execute_batch(
2061 "INSERT INTO projection_profiles (kind, facet, config_json)
2062 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
2063 )
2064 .expect("first insert");
2065
2066 let result = conn.execute_batch(
2068 "INSERT INTO projection_profiles (kind, facet, config_json)
2069 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2070 );
2071 assert!(
2072 result.is_err(),
2073 "duplicate (kind, facet) must violate PRIMARY KEY"
2074 );
2075 }
2076
2077 #[test]
2078 fn migration_20_resolve_fts_tokenizer_end_to_end() {
2079 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2080 let manager = SchemaManager::new();
2081 manager.bootstrap(&conn).expect("bootstrap");
2082
2083 conn.execute_batch(
2084 "INSERT INTO projection_profiles (kind, facet, config_json)
2085 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2086 )
2087 .expect("insert profile");
2088
2089 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2090 assert_eq!(result, "trigram");
2091
2092 let default_result =
2093 super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2094 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2095 }
2096
2097 #[test]
2098 fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2099 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2100 let manager = SchemaManager::new();
2101
2102 manager.bootstrap(&conn).expect("first bootstrap");
2110
2111 conn.execute_batch(
2112 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2113 VALUES ('TestKind', '[]', ',', 1)",
2114 )
2115 .expect("insert kind");
2116
2117 SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2120
2121 let count: i64 = conn
2123 .query_row(
2124 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='fts_props_testkind'",
2125 [],
2126 |r| r.get(0),
2127 )
2128 .expect("count fts table");
2129 assert_eq!(
2130 count, 1,
2131 "fts_props_testkind virtual table should be created"
2132 );
2133
2134 let state: String = conn
2136 .query_row(
2137 "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2138 [],
2139 |r| r.get(0),
2140 )
2141 .expect("rebuild state row");
2142 assert_eq!(state, "PENDING");
2143 }
2144
2145 #[test]
2146 fn migration_22_adds_columns_json_to_staging_table() {
2147 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2148 let manager = SchemaManager::new();
2149 manager.bootstrap(&conn).expect("bootstrap");
2150
2151 let col_count: i64 = conn
2152 .query_row(
2153 "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2154 [],
2155 |r| r.get(0),
2156 )
2157 .expect("pragma_table_info");
2158 assert_eq!(
2159 col_count, 1,
2160 "columns_json column must exist after migration 22"
2161 );
2162 }
2163
2164 #[test]
2167 fn vec_kind_table_name_simple_kind() {
2168 assert_eq!(
2169 super::vec_kind_table_name("WMKnowledgeObject"),
2170 "vec_wmknowledgeobject"
2171 );
2172 }
2173
2174 #[test]
2175 fn vec_kind_table_name_another_kind() {
2176 assert_eq!(super::vec_kind_table_name("MyKind"), "vec_mykind");
2177 }
2178
2179 #[test]
2180 fn vec_kind_table_name_with_separator_chars() {
2181 assert_eq!(
2182 super::vec_kind_table_name("MyKind-With.Dots"),
2183 "vec_mykind_with_dots"
2184 );
2185 }
2186
2187 #[test]
2188 fn vec_kind_table_name_collapses_consecutive_underscores() {
2189 assert_eq!(
2190 super::vec_kind_table_name("Kind__Double__Underscores"),
2191 "vec_kind_double_underscores"
2192 );
2193 }
2194
2195 #[cfg(feature = "sqlite-vec")]
2198 #[test]
2199 fn per_kind_vec_table_created_when_vec_profile_registered() {
2200 unsafe {
2203 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2204 sqlite_vec::sqlite3_vec_init as *const (),
2205 )));
2206 }
2207 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2208 let manager = SchemaManager::new();
2209 manager.bootstrap(&conn).expect("bootstrap");
2210
2211 manager
2213 .ensure_vec_kind_profile(&conn, "MyKind", 128)
2214 .expect("ensure_vec_kind_profile");
2215
2216 let count: i64 = conn
2218 .query_row(
2219 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_mykind'",
2220 [],
2221 |r| r.get(0),
2222 )
2223 .expect("query sqlite_master");
2224 assert_eq!(count, 1, "vec_mykind virtual table must be created");
2225
2226 let pp_count: i64 = conn
2228 .query_row(
2229 "SELECT count(*) FROM projection_profiles WHERE kind='MyKind' AND facet='vec'",
2230 [],
2231 |r| r.get(0),
2232 )
2233 .expect("query projection_profiles");
2234 assert_eq!(
2235 pp_count, 1,
2236 "projection_profiles row must exist for (MyKind, vec)"
2237 );
2238
2239 let old_count: i64 = conn
2241 .query_row(
2242 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
2243 [],
2244 |r| r.get(0),
2245 )
2246 .expect("query sqlite_master");
2247 assert_eq!(
2248 old_count, 0,
2249 "vec_nodes_active must NOT be created for per-kind registration"
2250 );
2251 }
2252
2253 #[test]
2255 fn migration_23_drops_global_fts_node_properties_table() {
2256 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2257 let manager = SchemaManager::new();
2258 manager.bootstrap(&conn).expect("bootstrap");
2259
2260 let count: i64 = conn
2262 .query_row(
2263 "SELECT count(*) FROM sqlite_master \
2264 WHERE type='table' AND name='fts_node_properties'",
2265 [],
2266 |r| r.get(0),
2267 )
2268 .expect("check sqlite_master");
2269 assert_eq!(
2270 count, 0,
2271 "fts_node_properties must be dropped by migration 23"
2272 );
2273 }
2274}