1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7 Migration::new(
8 SchemaVersion(1),
9 "initial canonical schema and runtime tables",
10 r"
11 CREATE TABLE IF NOT EXISTS nodes (
12 row_id TEXT PRIMARY KEY,
13 logical_id TEXT NOT NULL,
14 kind TEXT NOT NULL,
15 properties BLOB NOT NULL,
16 created_at INTEGER NOT NULL,
17 superseded_at INTEGER,
18 source_ref TEXT,
19 confidence REAL
20 );
21
22 CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23 ON nodes(logical_id)
24 WHERE superseded_at IS NULL;
25 CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26 ON nodes(kind, superseded_at);
27 CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28 ON nodes(source_ref);
29
30 CREATE TABLE IF NOT EXISTS edges (
31 row_id TEXT PRIMARY KEY,
32 logical_id TEXT NOT NULL,
33 source_logical_id TEXT NOT NULL,
34 target_logical_id TEXT NOT NULL,
35 kind TEXT NOT NULL,
36 properties BLOB NOT NULL,
37 created_at INTEGER NOT NULL,
38 superseded_at INTEGER,
39 source_ref TEXT,
40 confidence REAL
41 );
42
43 CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44 ON edges(logical_id)
45 WHERE superseded_at IS NULL;
46 CREATE INDEX IF NOT EXISTS idx_edges_source_active
47 ON edges(source_logical_id, kind, superseded_at);
48 CREATE INDEX IF NOT EXISTS idx_edges_target_active
49 ON edges(target_logical_id, kind, superseded_at);
50 CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51 ON edges(source_ref);
52
53 CREATE TABLE IF NOT EXISTS chunks (
54 id TEXT PRIMARY KEY,
55 node_logical_id TEXT NOT NULL,
56 text_content TEXT NOT NULL,
57 byte_start INTEGER,
58 byte_end INTEGER,
59 created_at INTEGER NOT NULL
60 );
61
62 CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63 ON chunks(node_logical_id);
64
65 CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66 chunk_id UNINDEXED,
67 node_logical_id UNINDEXED,
68 kind UNINDEXED,
69 text_content
70 );
71
72 CREATE TABLE IF NOT EXISTS vector_profiles (
73 profile TEXT PRIMARY KEY,
74 table_name TEXT NOT NULL,
75 dimension INTEGER NOT NULL,
76 enabled INTEGER NOT NULL DEFAULT 0
77 );
78
79 CREATE TABLE IF NOT EXISTS runs (
80 id TEXT PRIMARY KEY,
81 kind TEXT NOT NULL,
82 status TEXT NOT NULL,
83 properties BLOB NOT NULL,
84 created_at INTEGER NOT NULL,
85 completed_at INTEGER,
86 superseded_at INTEGER,
87 source_ref TEXT
88 );
89
90 CREATE TABLE IF NOT EXISTS steps (
91 id TEXT PRIMARY KEY,
92 run_id TEXT NOT NULL,
93 kind TEXT NOT NULL,
94 status TEXT NOT NULL,
95 properties BLOB NOT NULL,
96 created_at INTEGER NOT NULL,
97 completed_at INTEGER,
98 superseded_at INTEGER,
99 source_ref TEXT,
100 FOREIGN KEY(run_id) REFERENCES runs(id)
101 );
102
103 CREATE TABLE IF NOT EXISTS actions (
104 id TEXT PRIMARY KEY,
105 step_id TEXT NOT NULL,
106 kind TEXT NOT NULL,
107 status TEXT NOT NULL,
108 properties BLOB NOT NULL,
109 created_at INTEGER NOT NULL,
110 completed_at INTEGER,
111 superseded_at INTEGER,
112 source_ref TEXT,
113 FOREIGN KEY(step_id) REFERENCES steps(id)
114 );
115
116 CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117 ON runs(source_ref);
118 CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119 ON steps(source_ref);
120 CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121 ON actions(source_ref);
122 ",
123 ),
124 Migration::new(
125 SchemaVersion(2),
126 "durable audit trail: provenance_events table",
127 r"
128 CREATE TABLE IF NOT EXISTS provenance_events (
129 id TEXT PRIMARY KEY,
130 event_type TEXT NOT NULL,
131 subject TEXT NOT NULL,
132 source_ref TEXT,
133 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
134 );
135 CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136 ON provenance_events (subject, event_type);
137 ",
138 ),
139 Migration::new(
140 SchemaVersion(3),
141 "vector regeneration contracts",
142 r"
143 CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144 profile TEXT PRIMARY KEY,
145 table_name TEXT NOT NULL,
146 model_identity TEXT NOT NULL,
147 model_version TEXT NOT NULL,
148 dimension INTEGER NOT NULL,
149 normalization_policy TEXT NOT NULL,
150 chunking_policy TEXT NOT NULL,
151 preprocessing_policy TEXT NOT NULL,
152 generator_command_json TEXT NOT NULL,
153 updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
154 );
155 ",
156 ),
157 Migration::new(
158 SchemaVersion(4),
159 "vector regeneration apply metadata",
160 r"
161 ALTER TABLE vector_embedding_contracts
162 ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163 ALTER TABLE vector_embedding_contracts
164 ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165 UPDATE vector_embedding_contracts
166 SET
167 applied_at = CASE
168 WHEN applied_at = 0 THEN updated_at
169 ELSE applied_at
170 END,
171 snapshot_hash = CASE
172 WHEN snapshot_hash = '' THEN 'legacy'
173 ELSE snapshot_hash
174 END;
175 ",
176 ),
177 Migration::new(
178 SchemaVersion(5),
179 "vector regeneration contract format version",
180 r"
181 ALTER TABLE vector_embedding_contracts
182 ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183 UPDATE vector_embedding_contracts
184 SET contract_format_version = 1
185 WHERE contract_format_version = 0;
186 ",
187 ),
188 Migration::new(
189 SchemaVersion(6),
190 "provenance metadata payloads",
191 r"
192 ALTER TABLE provenance_events
193 ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194 ",
195 ),
196 Migration::new(
197 SchemaVersion(7),
198 "operational store canonical and derived tables",
199 r"
200 CREATE TABLE IF NOT EXISTS operational_collections (
201 name TEXT PRIMARY KEY,
202 kind TEXT NOT NULL,
203 schema_json TEXT NOT NULL,
204 retention_json TEXT NOT NULL,
205 format_version INTEGER NOT NULL DEFAULT 1,
206 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
207 disabled_at INTEGER
208 );
209
210 CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211 ON operational_collections(kind, disabled_at);
212
213 CREATE TABLE IF NOT EXISTS operational_mutations (
214 id TEXT PRIMARY KEY,
215 collection_name TEXT NOT NULL,
216 record_key TEXT NOT NULL,
217 op_kind TEXT NOT NULL,
218 payload_json TEXT NOT NULL,
219 source_ref TEXT,
220 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
221 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222 );
223
224 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225 ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226 CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227 ON operational_mutations(source_ref);
228
229 CREATE TABLE IF NOT EXISTS operational_current (
230 collection_name TEXT NOT NULL,
231 record_key TEXT NOT NULL,
232 payload_json TEXT NOT NULL,
233 updated_at INTEGER NOT NULL,
234 last_mutation_id TEXT NOT NULL,
235 PRIMARY KEY(collection_name, record_key),
236 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238 );
239
240 CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241 ON operational_current(collection_name, updated_at DESC);
242 ",
243 ),
244 Migration::new(
245 SchemaVersion(8),
246 "operational mutation ordering hardening",
247 r"
248 ALTER TABLE operational_mutations
249 ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250 UPDATE operational_mutations
251 SET mutation_order = rowid
252 WHERE mutation_order = 0;
253 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254 ON operational_mutations(collection_name, record_key, mutation_order DESC);
255 ",
256 ),
257 Migration::new(
258 SchemaVersion(9),
259 "node last_accessed metadata",
260 r"
261 CREATE TABLE IF NOT EXISTS node_access_metadata (
262 logical_id TEXT PRIMARY KEY,
263 last_accessed_at INTEGER NOT NULL,
264 updated_at INTEGER NOT NULL
265 );
266
267 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268 ON node_access_metadata(last_accessed_at DESC);
269 ",
270 ),
271 Migration::new(
272 SchemaVersion(10),
273 "operational filtered read contracts and extracted values",
274 r"
275 ALTER TABLE operational_collections
276 ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278 CREATE TABLE IF NOT EXISTS operational_filter_values (
279 mutation_id TEXT NOT NULL,
280 collection_name TEXT NOT NULL,
281 field_name TEXT NOT NULL,
282 string_value TEXT,
283 integer_value INTEGER,
284 PRIMARY KEY(mutation_id, field_name),
285 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287 );
288
289 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293 ",
294 ),
295 Migration::new(
296 SchemaVersion(11),
297 "operational payload validation contracts",
298 r"
299 ALTER TABLE operational_collections
300 ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301 ",
302 ),
303 Migration::new(
304 SchemaVersion(12),
305 "operational secondary indexes",
306 r"
307 ALTER TABLE operational_collections
308 ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311 collection_name TEXT NOT NULL,
312 index_name TEXT NOT NULL,
313 subject_kind TEXT NOT NULL,
314 mutation_id TEXT NOT NULL DEFAULT '',
315 record_key TEXT NOT NULL DEFAULT '',
316 sort_timestamp INTEGER,
317 slot1_text TEXT,
318 slot1_integer INTEGER,
319 slot2_text TEXT,
320 slot2_integer INTEGER,
321 slot3_text TEXT,
322 slot3_integer INTEGER,
323 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326 );
327
328 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329 ON operational_secondary_index_entries(
330 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331 );
332 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333 ON operational_secondary_index_entries(
334 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335 );
336 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337 ON operational_secondary_index_entries(
338 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339 );
340 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341 ON operational_secondary_index_entries(
342 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343 );
344 ",
345 ),
346 Migration::new(
347 SchemaVersion(13),
348 "operational retention run metadata",
349 r"
350 CREATE TABLE IF NOT EXISTS operational_retention_runs (
351 id TEXT PRIMARY KEY,
352 collection_name TEXT NOT NULL,
353 executed_at INTEGER NOT NULL,
354 action_kind TEXT NOT NULL,
355 dry_run INTEGER NOT NULL DEFAULT 0,
356 deleted_mutations INTEGER NOT NULL,
357 rows_remaining INTEGER NOT NULL,
358 metadata_json TEXT NOT NULL DEFAULT '',
359 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360 );
361
362 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363 ON operational_retention_runs(collection_name, executed_at DESC);
364 ",
365 ),
366 Migration::new(
367 SchemaVersion(14),
368 "external content object columns",
369 r"
370 ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373 ON nodes(content_ref)
374 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376 ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377 ",
378 ),
379 Migration::new(
380 SchemaVersion(15),
381 "FTS property projection schemas",
382 r"
383 CREATE TABLE IF NOT EXISTS fts_property_schemas (
384 kind TEXT PRIMARY KEY,
385 property_paths_json TEXT NOT NULL,
386 separator TEXT NOT NULL DEFAULT ' ',
387 format_version INTEGER NOT NULL DEFAULT 1,
388 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
389 );
390
391 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392 node_logical_id UNINDEXED,
393 kind UNINDEXED,
394 text_content
395 );
396 ",
397 ),
398 Migration::new(
399 SchemaVersion(16),
400 "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401 "",
407 ),
408 Migration::new(
409 SchemaVersion(17),
410 "fts property position-map sidecar for recursive extraction",
411 r"
426 CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427 node_logical_id TEXT NOT NULL,
428 kind TEXT NOT NULL,
429 start_offset INTEGER NOT NULL,
430 end_offset INTEGER NOT NULL,
431 leaf_path TEXT NOT NULL
432 );
433
434 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435 ON fts_node_property_positions(node_logical_id, kind);
436
437 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438 ON fts_node_property_positions(kind);
439 ",
440 ),
441 Migration::new(
442 SchemaVersion(18),
443 "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444 r"
457 DROP TABLE IF EXISTS fts_node_property_positions;
458
459 CREATE TABLE fts_node_property_positions (
460 node_logical_id TEXT NOT NULL,
461 kind TEXT NOT NULL,
462 start_offset INTEGER NOT NULL,
463 end_offset INTEGER NOT NULL,
464 leaf_path TEXT NOT NULL,
465 UNIQUE(node_logical_id, kind, start_offset)
466 );
467
468 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469 ON fts_node_property_positions(node_logical_id, kind);
470
471 CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472 ON fts_node_property_positions(kind);
473 ",
474 ),
475 Migration::new(
476 SchemaVersion(19),
477 "async property-FTS rebuild staging and state tables",
478 r"
479 CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480 kind TEXT NOT NULL,
481 node_logical_id TEXT NOT NULL,
482 text_content TEXT NOT NULL,
483 positions_blob BLOB,
484 PRIMARY KEY (kind, node_logical_id)
485 );
486
487 CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488 kind TEXT PRIMARY KEY,
489 schema_id INTEGER NOT NULL,
490 state TEXT NOT NULL,
491 rows_total INTEGER,
492 rows_done INTEGER NOT NULL DEFAULT 0,
493 started_at INTEGER NOT NULL,
494 last_progress_at INTEGER,
495 error_message TEXT,
496 is_first_registration INTEGER NOT NULL DEFAULT 0
497 );
498 ",
499 ),
500 Migration::new(
501 SchemaVersion(20),
502 "projection_profiles table for per-kind FTS tokenizer configuration",
503 r"CREATE TABLE IF NOT EXISTS projection_profiles (
504 kind TEXT NOT NULL,
505 facet TEXT NOT NULL,
506 config_json TEXT NOT NULL,
507 active_at INTEGER,
508 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
509 PRIMARY KEY (kind, facet)
510 );",
511 ),
512 Migration::new(
513 SchemaVersion(21),
514 "per-kind FTS5 tables replacing fts_node_properties",
515 "",
516 ),
517 Migration::new(
518 SchemaVersion(22),
519 "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521 ),
522 Migration::new(
523 SchemaVersion(23),
524 "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525 "DROP TABLE IF EXISTS fts_node_properties;",
526 ),
527 Migration::new(
528 SchemaVersion(24),
529 "projection table registry and collision-proof per-kind table names",
530 "",
531 ),
532 Migration::new(
540 SchemaVersion(25),
541 "managed vector projection tables: embedding profiles, per-kind index schemas, async work queue",
542 r"
543 CREATE TABLE IF NOT EXISTS vector_embedding_profiles (
544 profile_id INTEGER PRIMARY KEY AUTOINCREMENT,
545 profile_name TEXT NOT NULL,
546 model_identity TEXT NOT NULL,
547 model_version TEXT,
548 dimensions INTEGER NOT NULL,
549 normalization_policy TEXT,
550 max_tokens INTEGER,
551 active INTEGER NOT NULL DEFAULT 0,
552 activated_at INTEGER,
553 created_at INTEGER NOT NULL
554 );
555
556 CREATE UNIQUE INDEX IF NOT EXISTS idx_vep_singleton_active
557 ON vector_embedding_profiles(active)
558 WHERE active = 1;
559
560 CREATE UNIQUE INDEX IF NOT EXISTS idx_vep_identity
561 ON vector_embedding_profiles(model_identity, model_version, dimensions);
562
563 CREATE TABLE IF NOT EXISTS vector_index_schemas (
564 kind TEXT PRIMARY KEY,
565 enabled INTEGER NOT NULL DEFAULT 1,
566 source_mode TEXT NOT NULL,
567 source_config_json TEXT,
568 chunking_policy TEXT,
569 preprocessing_policy TEXT,
570 state TEXT NOT NULL DEFAULT 'fresh',
571 last_error TEXT,
572 last_completed_at INTEGER,
573 created_at INTEGER NOT NULL,
574 updated_at INTEGER NOT NULL
575 );
576
577 CREATE TABLE IF NOT EXISTS vector_projection_work (
578 work_id INTEGER PRIMARY KEY AUTOINCREMENT,
579 kind TEXT NOT NULL,
580 node_logical_id TEXT,
581 chunk_id TEXT NOT NULL,
582 canonical_hash TEXT NOT NULL,
583 priority INTEGER NOT NULL DEFAULT 0,
584 embedding_profile_id INTEGER NOT NULL REFERENCES vector_embedding_profiles(profile_id),
585 attempt_count INTEGER NOT NULL DEFAULT 0,
586 last_error TEXT,
587 state TEXT NOT NULL DEFAULT 'pending',
588 created_at INTEGER NOT NULL,
589 updated_at INTEGER NOT NULL
590 );
591
592 CREATE INDEX IF NOT EXISTS idx_vpw_schedule
593 ON vector_projection_work(state, priority DESC, created_at ASC);
594
595 CREATE INDEX IF NOT EXISTS idx_vpw_chunk
596 ON vector_projection_work(chunk_id);
597 ",
598 ),
599];
600
601#[derive(Clone, Debug, PartialEq, Eq)]
602pub struct BootstrapReport {
603 pub sqlite_version: String,
604 pub applied_versions: Vec<SchemaVersion>,
605 pub vector_profile_enabled: bool,
606}
607
608#[derive(Clone, Debug, Default)]
609pub struct SchemaManager;
610
611impl SchemaManager {
612 #[must_use]
613 pub fn new() -> Self {
614 Self
615 }
616
617 pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
625 self.initialize_connection(conn)?;
626 Self::ensure_metadata_tables(conn)?;
627
628 let max_applied: u32 = conn.query_row(
630 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
631 [],
632 |row| row.get(0),
633 )?;
634 let engine_version = self.current_version().0;
635 trace_info!(
636 current_version = max_applied,
637 engine_version,
638 "schema bootstrap: version check"
639 );
640 if max_applied > engine_version {
641 trace_error!(
642 database_version = max_applied,
643 engine_version,
644 "schema version mismatch: database is newer than engine"
645 );
646 return Err(SchemaError::VersionMismatch {
647 database_version: max_applied,
648 engine_version,
649 });
650 }
651
652 let mut applied_versions = Vec::new();
653 for migration in self.migrations() {
654 let already_applied = conn
655 .query_row(
656 "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
657 [i64::from(migration.version.0)],
658 |row| row.get::<_, i64>(0),
659 )
660 .optional()?
661 .is_some();
662
663 if already_applied {
664 continue;
665 }
666
667 let tx = conn.unchecked_transaction()?;
668 match migration.version {
669 SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
670 SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
671 SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
672 SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
673 SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
674 SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
675 SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
676 SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
677 SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
678 SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
679 SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
680 SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
681 SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
682 SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
683 SchemaVersion(24) => Self::ensure_projection_table_registry(&tx)?,
684 _ => tx.execute_batch(migration.sql)?,
685 }
686 tx.execute(
687 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
688 (i64::from(migration.version.0), migration.description),
689 )?;
690 tx.commit()?;
691 trace_info!(
692 version = migration.version.0,
693 description = migration.description,
694 "schema migration applied"
695 );
696 applied_versions.push(migration.version);
697 }
698
699 let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
700 let vector_profile_count: i64 = conn.query_row(
701 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
702 [],
703 |row| row.get(0),
704 )?;
705 Ok(BootstrapReport {
706 sqlite_version,
707 applied_versions,
708 vector_profile_enabled: vector_profile_count > 0,
709 })
710 }
711
712 fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
713 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
714 let columns = stmt
715 .query_map([], |row| row.get::<_, String>(1))?
716 .collect::<Result<Vec<_>, _>>()?;
717 let has_applied_at = columns.iter().any(|column| column == "applied_at");
718 let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
719
720 if !has_applied_at {
721 conn.execute(
722 "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
723 [],
724 )?;
725 }
726 if !has_snapshot_hash {
727 conn.execute(
728 "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
729 [],
730 )?;
731 }
732 conn.execute(
733 r"
734 UPDATE vector_embedding_contracts
735 SET
736 applied_at = CASE
737 WHEN applied_at = 0 THEN updated_at
738 ELSE applied_at
739 END,
740 snapshot_hash = CASE
741 WHEN snapshot_hash = '' THEN 'legacy'
742 ELSE snapshot_hash
743 END
744 ",
745 [],
746 )?;
747 Ok(())
748 }
749
750 fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
751 let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
752 let columns = stmt
753 .query_map([], |row| row.get::<_, String>(1))?
754 .collect::<Result<Vec<_>, _>>()?;
755 let has_contract_format_version = columns
756 .iter()
757 .any(|column| column == "contract_format_version");
758
759 if !has_contract_format_version {
760 conn.execute(
761 "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
762 [],
763 )?;
764 }
765 conn.execute(
766 r"
767 UPDATE vector_embedding_contracts
768 SET contract_format_version = 1
769 WHERE contract_format_version = 0
770 ",
771 [],
772 )?;
773 Ok(())
774 }
775
776 fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
777 let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
778 let columns = stmt
779 .query_map([], |row| row.get::<_, String>(1))?
780 .collect::<Result<Vec<_>, _>>()?;
781 let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
782
783 if !has_metadata_json {
784 conn.execute(
785 "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
786 [],
787 )?;
788 }
789 Ok(())
790 }
791
792 fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
793 let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
794 let columns = stmt
795 .query_map([], |row| row.get::<_, String>(1))?
796 .collect::<Result<Vec<_>, _>>()?;
797 let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
798
799 if !has_mutation_order {
800 conn.execute(
801 "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
802 [],
803 )?;
804 }
805 conn.execute(
806 r"
807 UPDATE operational_mutations
808 SET mutation_order = rowid
809 WHERE mutation_order = 0
810 ",
811 [],
812 )?;
813 conn.execute(
814 r"
815 CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
816 ON operational_mutations(collection_name, record_key, mutation_order DESC)
817 ",
818 [],
819 )?;
820 Ok(())
821 }
822
823 fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
824 conn.execute_batch(
825 r"
826 CREATE TABLE IF NOT EXISTS node_access_metadata (
827 logical_id TEXT PRIMARY KEY,
828 last_accessed_at INTEGER NOT NULL,
829 updated_at INTEGER NOT NULL
830 );
831
832 CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
833 ON node_access_metadata(last_accessed_at DESC);
834 ",
835 )?;
836 Ok(())
837 }
838
839 fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
840 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
841 let columns = stmt
842 .query_map([], |row| row.get::<_, String>(1))?
843 .collect::<Result<Vec<_>, _>>()?;
844 let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
845
846 if !has_filter_fields_json {
847 conn.execute(
848 "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
849 [],
850 )?;
851 }
852
853 conn.execute_batch(
854 r"
855 CREATE TABLE IF NOT EXISTS operational_filter_values (
856 mutation_id TEXT NOT NULL,
857 collection_name TEXT NOT NULL,
858 field_name TEXT NOT NULL,
859 string_value TEXT,
860 integer_value INTEGER,
861 PRIMARY KEY(mutation_id, field_name),
862 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
863 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
864 );
865
866 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
867 ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
868 CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
869 ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
870 ",
871 )?;
872 Ok(())
873 }
874
875 fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
876 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
877 let columns = stmt
878 .query_map([], |row| row.get::<_, String>(1))?
879 .collect::<Result<Vec<_>, _>>()?;
880 let has_validation_json = columns.iter().any(|column| column == "validation_json");
881
882 if !has_validation_json {
883 conn.execute(
884 "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
885 [],
886 )?;
887 }
888
889 Ok(())
890 }
891
892 fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
893 let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
894 let columns = stmt
895 .query_map([], |row| row.get::<_, String>(1))?
896 .collect::<Result<Vec<_>, _>>()?;
897 let has_secondary_indexes_json = columns
898 .iter()
899 .any(|column| column == "secondary_indexes_json");
900
901 if !has_secondary_indexes_json {
902 conn.execute(
903 "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
904 [],
905 )?;
906 }
907
908 conn.execute_batch(
909 r"
910 CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
911 collection_name TEXT NOT NULL,
912 index_name TEXT NOT NULL,
913 subject_kind TEXT NOT NULL,
914 mutation_id TEXT NOT NULL DEFAULT '',
915 record_key TEXT NOT NULL DEFAULT '',
916 sort_timestamp INTEGER,
917 slot1_text TEXT,
918 slot1_integer INTEGER,
919 slot2_text TEXT,
920 slot2_integer INTEGER,
921 slot3_text TEXT,
922 slot3_integer INTEGER,
923 PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
924 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
925 FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
926 );
927
928 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
929 ON operational_secondary_index_entries(
930 collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
931 );
932 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
933 ON operational_secondary_index_entries(
934 collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
935 );
936 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
937 ON operational_secondary_index_entries(
938 collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
939 );
940 CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
941 ON operational_secondary_index_entries(
942 collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
943 );
944 ",
945 )?;
946
947 Ok(())
948 }
949
950 fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
951 conn.execute_batch(
952 r"
953 CREATE TABLE IF NOT EXISTS operational_retention_runs (
954 id TEXT PRIMARY KEY,
955 collection_name TEXT NOT NULL,
956 executed_at INTEGER NOT NULL,
957 action_kind TEXT NOT NULL,
958 dry_run INTEGER NOT NULL DEFAULT 0,
959 deleted_mutations INTEGER NOT NULL,
960 rows_remaining INTEGER NOT NULL,
961 metadata_json TEXT NOT NULL DEFAULT '',
962 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
963 );
964
965 CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
966 ON operational_retention_runs(collection_name, executed_at DESC);
967 ",
968 )?;
969 Ok(())
970 }
971
972 fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
973 let node_columns = Self::column_names(conn, "nodes")?;
974 if !node_columns.iter().any(|c| c == "content_ref") {
975 conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
976 }
977 conn.execute_batch(
978 r"
979 CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
980 ON nodes(content_ref)
981 WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
982 ",
983 )?;
984
985 let chunk_columns = Self::column_names(conn, "chunks")?;
986 if !chunk_columns.iter().any(|c| c == "content_hash") {
987 conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
988 }
989 Ok(())
990 }
991
992 fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
1010 conn.execute_batch(
1011 r"
1012 DROP TABLE IF EXISTS fts_nodes;
1013 CREATE VIRTUAL TABLE fts_nodes USING fts5(
1014 chunk_id UNINDEXED,
1015 node_logical_id UNINDEXED,
1016 kind UNINDEXED,
1017 text_content,
1018 tokenize = 'porter unicode61 remove_diacritics 2'
1019 );
1020
1021 DROP TABLE IF EXISTS fts_node_properties;
1022 CREATE VIRTUAL TABLE fts_node_properties USING fts5(
1023 node_logical_id UNINDEXED,
1024 kind UNINDEXED,
1025 text_content,
1026 tokenize = 'porter unicode61 remove_diacritics 2'
1027 );
1028
1029 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1030 SELECT c.id, n.logical_id, n.kind, c.text_content
1031 FROM chunks c
1032 JOIN nodes n
1033 ON n.logical_id = c.node_logical_id
1034 AND n.superseded_at IS NULL;
1035 ",
1036 )?;
1037 Ok(())
1038 }
1039
1040 fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
1041 conn.execute_batch(
1042 r"
1043 CREATE TABLE IF NOT EXISTS fts_property_schemas (
1044 kind TEXT PRIMARY KEY,
1045 property_paths_json TEXT NOT NULL,
1046 separator TEXT NOT NULL DEFAULT ' ',
1047 format_version INTEGER NOT NULL DEFAULT 1,
1048 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1049 );
1050
1051 CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
1052 node_logical_id UNINDEXED,
1053 kind UNINDEXED,
1054 text_content
1055 );
1056 ",
1057 )?;
1058 Ok(())
1059 }
1060
1061 fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
1062 let kinds: Vec<String> = {
1064 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
1065 stmt.query_map([], |r| r.get::<_, String>(0))?
1066 .collect::<Result<Vec<_>, _>>()?
1067 };
1068
1069 for kind in &kinds {
1070 let table_name = fts_kind_table_name(kind);
1071 let ddl = format!(
1072 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1073 node_logical_id UNINDEXED, \
1074 text_content, \
1075 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1076 )"
1077 );
1078 conn.execute_batch(&ddl)?;
1079
1080 conn.execute(
1082 "INSERT OR IGNORE INTO fts_property_rebuild_state \
1083 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1084 SELECT ?1, rowid, 'PENDING', 0, CAST(strftime('%s','now') AS INTEGER) * 1000, 0 \
1085 FROM fts_property_schemas WHERE kind = ?1",
1086 rusqlite::params![kind],
1087 )?;
1088 }
1089
1090 Ok(())
1093 }
1094
1095 fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1096 let column_exists: bool = {
1098 let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1099 let names: Vec<String> = stmt
1100 .query_map([], |r| r.get::<_, String>(1))?
1101 .collect::<Result<Vec<_>, _>>()?;
1102 names.iter().any(|n| n == "columns_json")
1103 };
1104
1105 if !column_exists {
1106 conn.execute_batch(
1107 "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1108 )?;
1109 }
1110
1111 Ok(())
1112 }
1113
1114 fn ensure_projection_table_registry(conn: &Connection) -> Result<(), SchemaError> {
1115 conn.execute_batch(
1116 r"
1117 CREATE TABLE IF NOT EXISTS projection_table_registry (
1118 kind TEXT NOT NULL,
1119 facet TEXT NOT NULL CHECK (facet IN ('fts', 'vec')),
1120 table_name TEXT NOT NULL UNIQUE,
1121 naming_version INTEGER NOT NULL,
1122 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1123 migrated_from TEXT,
1124 PRIMARY KEY (kind, facet)
1125 );
1126 ",
1127 )?;
1128
1129 let fts_kinds: Vec<String> = conn
1130 .prepare("SELECT kind FROM fts_property_schemas")
1131 .optional()?
1132 .map(|mut stmt| {
1133 stmt.query_map([], |r| r.get::<_, String>(0))?
1134 .collect::<Result<Vec<_>, _>>()
1135 })
1136 .transpose()?
1137 .unwrap_or_default();
1138
1139 for kind in &fts_kinds {
1140 let table_name = fts_kind_table_name(kind);
1141 let legacy = legacy_fts_kind_table_name(kind);
1142 conn.execute(
1143 "INSERT INTO projection_table_registry \
1144 (kind, facet, table_name, naming_version, migrated_from) \
1145 VALUES (?1, 'fts', ?2, 2, ?3) \
1146 ON CONFLICT(kind, facet) DO UPDATE SET \
1147 table_name = excluded.table_name, \
1148 naming_version = excluded.naming_version, \
1149 migrated_from = excluded.migrated_from",
1150 rusqlite::params![kind, table_name, legacy],
1151 )?;
1152 conn.execute(
1153 "INSERT INTO fts_property_rebuild_state \
1154 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1155 SELECT ?1, rowid, 'PENDING', 0, CAST(strftime('%s','now') AS INTEGER) * 1000, 0 \
1156 FROM fts_property_schemas WHERE kind = ?1 \
1157 ON CONFLICT(kind) DO UPDATE SET \
1158 schema_id = excluded.schema_id, \
1159 state = CASE \
1160 WHEN fts_property_rebuild_state.state IN ('BUILDING', 'SWAPPING') \
1161 THEN fts_property_rebuild_state.state \
1162 ELSE 'PENDING' \
1163 END, \
1164 rows_total = NULL, \
1165 rows_done = 0, \
1166 started_at = excluded.started_at, \
1167 last_progress_at = NULL, \
1168 error_message = NULL",
1169 rusqlite::params![kind],
1170 )?;
1171 }
1172
1173 let vec_kinds: Vec<String> = conn
1174 .prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'")
1175 .optional()?
1176 .map(|mut stmt| {
1177 stmt.query_map([], |r| r.get::<_, String>(0))?
1178 .collect::<Result<Vec<_>, _>>()
1179 })
1180 .transpose()?
1181 .unwrap_or_default();
1182
1183 for kind in &vec_kinds {
1184 let table_name = vec_kind_table_name(kind);
1185 let legacy = legacy_vec_kind_table_name(kind);
1186 conn.execute(
1187 "INSERT INTO projection_table_registry \
1188 (kind, facet, table_name, naming_version, migrated_from) \
1189 VALUES (?1, 'vec', ?2, 2, ?3) \
1190 ON CONFLICT(kind, facet) DO UPDATE SET \
1191 table_name = excluded.table_name, \
1192 naming_version = excluded.naming_version, \
1193 migrated_from = excluded.migrated_from",
1194 rusqlite::params![kind, table_name, legacy],
1195 )?;
1196 }
1197
1198 Ok(())
1199 }
1200
1201 fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1202 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1203 let names = stmt
1204 .query_map([], |row| row.get::<_, String>(1))?
1205 .collect::<Result<Vec<_>, _>>()?;
1206 Ok(names)
1207 }
1208
1209 #[must_use]
1210 pub fn current_version(&self) -> SchemaVersion {
1211 self.migrations()
1212 .last()
1213 .map_or(SchemaVersion(0), |migration| migration.version)
1214 }
1215
1216 #[must_use]
1217 pub fn migrations(&self) -> &'static [Migration] {
1218 MIGRATIONS
1219 }
1220
1221 pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1227 conn.execute_batch(
1228 r"
1229 PRAGMA foreign_keys = ON;
1230 PRAGMA journal_mode = WAL;
1231 PRAGMA synchronous = NORMAL;
1232 PRAGMA busy_timeout = 5000;
1233 PRAGMA temp_store = MEMORY;
1234 PRAGMA mmap_size = 3000000000;
1235 PRAGMA journal_size_limit = 536870912;
1236 ",
1237 )?;
1238 Ok(())
1239 }
1240
1241 pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1252 conn.execute_batch(
1253 r"
1254 PRAGMA foreign_keys = ON;
1255 PRAGMA busy_timeout = 5000;
1256 PRAGMA temp_store = MEMORY;
1257 PRAGMA mmap_size = 3000000000;
1258 ",
1259 )?;
1260 Ok(())
1261 }
1262
1263 #[cfg(feature = "sqlite-vec")]
1275 pub fn ensure_vector_profile(
1276 &self,
1277 conn: &Connection,
1278 profile: &str,
1279 table_name: &str,
1280 dimension: usize,
1281 ) -> Result<(), SchemaError> {
1282 conn.execute_batch(&format!(
1283 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1284 chunk_id TEXT PRIMARY KEY,\
1285 embedding float[{dimension}]\
1286 )"
1287 ))?;
1288 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1291 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1292 format!("vector dimension {dimension} does not fit in i64").into(),
1293 ))
1294 })?;
1295 conn.execute(
1296 "INSERT OR REPLACE INTO vector_profiles \
1297 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1298 rusqlite::params![profile, table_name, dimension_i64],
1299 )?;
1300 Ok(())
1301 }
1302
1303 #[cfg(not(feature = "sqlite-vec"))]
1308 pub fn ensure_vector_profile(
1309 &self,
1310 _conn: &Connection,
1311 _profile: &str,
1312 _table_name: &str,
1313 _dimension: usize,
1314 ) -> Result<(), SchemaError> {
1315 Err(SchemaError::MissingCapability("sqlite-vec"))
1316 }
1317
1318 #[cfg(feature = "sqlite-vec")]
1330 pub fn ensure_vec_kind_profile(
1331 &self,
1332 conn: &Connection,
1333 kind: &str,
1334 dimension: usize,
1335 ) -> Result<(), SchemaError> {
1336 let table_name = vec_kind_table_name(kind);
1337 conn.execute_batch(&format!(
1338 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1339 chunk_id TEXT PRIMARY KEY,\
1340 embedding float[{dimension}]\
1341 )"
1342 ))?;
1343 let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1344 SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1345 format!("vector dimension {dimension} does not fit in i64").into(),
1346 ))
1347 })?;
1348 conn.execute(
1350 "INSERT OR REPLACE INTO vector_profiles \
1351 (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1352 rusqlite::params![kind, table_name, dimension_i64],
1353 )?;
1354 let config_json =
1357 format!(r#"{{"table_name":"{table_name}","dimensions":{dimension_i64}}}"#);
1358 conn.execute(
1359 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
1360 VALUES (?1, 'vec', ?2, CAST(strftime('%s','now') AS INTEGER), CAST(strftime('%s','now') AS INTEGER)) \
1361 ON CONFLICT(kind, facet) DO UPDATE SET \
1362 config_json = ?2, \
1363 active_at = CAST(strftime('%s','now') AS INTEGER)",
1364 rusqlite::params![kind, config_json],
1365 )?;
1366 Ok(())
1367 }
1368
1369 #[cfg(not(feature = "sqlite-vec"))]
1374 pub fn ensure_vec_kind_profile(
1375 &self,
1376 _conn: &Connection,
1377 _kind: &str,
1378 _dimension: usize,
1379 ) -> Result<(), SchemaError> {
1380 Err(SchemaError::MissingCapability("sqlite-vec"))
1381 }
1382
1383 fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1389 conn.execute_batch(
1390 r"
1391 CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1392 version INTEGER PRIMARY KEY,
1393 description TEXT NOT NULL,
1394 applied_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1395 );
1396 ",
1397 )?;
1398 Ok(())
1399 }
1400}
1401
1402pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1404
1405#[must_use]
1410pub fn fts_kind_table_name(kind: &str) -> String {
1411 projection_table_name("fts_props", kind)
1412}
1413
1414#[must_use]
1416pub fn legacy_fts_kind_table_name(kind: &str) -> String {
1417 let slug = sanitize_kind_slug(kind);
1418 let prefixed = format!("fts_props_{slug}");
1419 if prefixed.len() <= 63 {
1420 prefixed
1421 } else {
1422 let hex = sha256_hex(kind);
1423 let hex_suffix = &hex[..7];
1424 let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1425 format!("fts_props_{slug_truncated}_{hex_suffix}")
1426 }
1427}
1428
1429#[must_use]
1434pub fn vec_kind_table_name(kind: &str) -> String {
1435 projection_table_name("vec", kind)
1436}
1437
1438#[must_use]
1440pub fn legacy_vec_kind_table_name(kind: &str) -> String {
1441 let slug = sanitize_kind_slug(kind);
1442 format!("vec_{slug}")
1443}
1444
1445fn projection_table_name(prefix: &str, kind: &str) -> String {
1446 const MAX_IDENTIFIER_LEN: usize = 63;
1447 const HASH_LEN: usize = 10;
1448 let slug = sanitize_kind_slug(kind);
1449 let hex = sha256_hex(kind);
1450 let suffix = &hex[..HASH_LEN];
1451 let budget = MAX_IDENTIFIER_LEN
1452 .saturating_sub(prefix.len())
1453 .saturating_sub(2)
1454 .saturating_sub(HASH_LEN)
1455 .max(1);
1456 let slug = if slug.len() > budget {
1457 &slug[..budget]
1458 } else {
1459 &slug
1460 };
1461 format!("{prefix}_{slug}_{suffix}")
1462}
1463
1464fn sanitize_kind_slug(kind: &str) -> String {
1465 let lowered = kind.to_lowercase();
1466 let mut slug = String::with_capacity(lowered.len());
1467 let mut prev_was_underscore = false;
1468 for ch in lowered.chars() {
1469 if ch.is_ascii_alphanumeric() {
1470 slug.push(ch);
1471 prev_was_underscore = false;
1472 } else {
1473 if !prev_was_underscore {
1474 slug.push('_');
1475 }
1476 prev_was_underscore = true;
1477 }
1478 }
1479 let slug = slug.trim_matches('_');
1480 if slug.is_empty() {
1481 "kind".to_owned()
1482 } else {
1483 slug.to_owned()
1484 }
1485}
1486
1487fn sha256_hex(kind: &str) -> String {
1488 let hash = Sha256::digest(kind.as_bytes());
1489 let mut hex = String::with_capacity(hash.len() * 2);
1490 for b in &hash {
1491 use std::fmt::Write as _;
1492 let _ = write!(hex, "{b:02x}");
1493 }
1494 hex
1495}
1496
1497#[must_use]
1505pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1506 let stripped = if let Some(rest) = path.strip_prefix("$.") {
1508 rest
1509 } else if let Some(rest) = path.strip_prefix('$') {
1510 rest
1511 } else {
1512 path
1513 };
1514
1515 let lowered = stripped.to_lowercase();
1517 let mut col = String::with_capacity(lowered.len());
1518 let mut prev_was_underscore = false;
1519 for ch in lowered.chars() {
1520 if ch.is_ascii_alphanumeric() || ch == '_' {
1521 col.push(ch);
1522 prev_was_underscore = ch == '_';
1523 } else {
1524 if !prev_was_underscore {
1525 col.push('_');
1526 }
1527 prev_was_underscore = true;
1528 }
1529 }
1530
1531 let col = col.trim_end_matches('_').to_owned();
1533
1534 if is_recursive {
1536 format!("{col}_all")
1537 } else {
1538 col
1539 }
1540}
1541
1542pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1553 let result = conn
1554 .query_row(
1555 "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1556 [kind],
1557 |row| row.get::<_, Option<String>>(0),
1558 )
1559 .optional();
1560
1561 match result {
1562 Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1563 Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1564 Err(rusqlite::Error::SqliteFailure(_, _)) => {
1565 Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1567 }
1568 Err(e) => Err(SchemaError::Sqlite(e)),
1569 }
1570}
1571
1572#[cfg(test)]
1573#[allow(clippy::expect_used)]
1574mod tests {
1575 use rusqlite::Connection;
1576
1577 use super::SchemaManager;
1578
1579 #[test]
1580 fn bootstrap_applies_initial_schema() {
1581 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1582 let manager = SchemaManager::new();
1583
1584 let report = manager.bootstrap(&conn).expect("bootstrap report");
1585
1586 assert_eq!(
1587 report.applied_versions.len(),
1588 manager.current_version().0 as usize
1589 );
1590 assert!(report.sqlite_version.starts_with('3'));
1591 let table_count: i64 = conn
1592 .query_row(
1593 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1594 [],
1595 |row| row.get(0),
1596 )
1597 .expect("nodes table exists");
1598 assert_eq!(table_count, 1);
1599 }
1600
1601 #[test]
1604 fn vector_profile_not_enabled_without_feature() {
1605 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1606 let manager = SchemaManager::new();
1607 let report = manager.bootstrap(&conn).expect("bootstrap");
1608 assert!(
1609 !report.vector_profile_enabled,
1610 "vector_profile_enabled must be false on a fresh bootstrap"
1611 );
1612 }
1613
1614 #[test]
1615 fn vector_profile_skipped_when_dimension_absent() {
1616 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1618 let manager = SchemaManager::new();
1619 manager.bootstrap(&conn).expect("bootstrap");
1620
1621 let count: i64 = conn
1622 .query_row(
1623 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1624 [],
1625 |row| row.get(0),
1626 )
1627 .expect("count");
1628 assert_eq!(
1629 count, 0,
1630 "no enabled profile without calling ensure_vector_profile"
1631 );
1632 }
1633
1634 #[test]
1635 fn bootstrap_report_reflects_actual_vector_state() {
1636 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1638 let manager = SchemaManager::new();
1639 let report = manager.bootstrap(&conn).expect("bootstrap");
1640
1641 let db_count: i64 = conn
1642 .query_row(
1643 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1644 [],
1645 |row| row.get(0),
1646 )
1647 .expect("count");
1648 assert_eq!(
1649 report.vector_profile_enabled,
1650 db_count > 0,
1651 "BootstrapReport.vector_profile_enabled must match actual DB state"
1652 );
1653 }
1654
1655 #[test]
1656 fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1657 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1658 conn.execute_batch(
1659 r#"
1660 CREATE TABLE provenance_events (
1661 id TEXT PRIMARY KEY,
1662 event_type TEXT NOT NULL,
1663 subject TEXT NOT NULL,
1664 source_ref TEXT,
1665 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1666 );
1667 CREATE TABLE vector_embedding_contracts (
1668 profile TEXT PRIMARY KEY,
1669 table_name TEXT NOT NULL,
1670 model_identity TEXT NOT NULL,
1671 model_version TEXT NOT NULL,
1672 dimension INTEGER NOT NULL,
1673 normalization_policy TEXT NOT NULL,
1674 chunking_policy TEXT NOT NULL,
1675 preprocessing_policy TEXT NOT NULL,
1676 generator_command_json TEXT NOT NULL,
1677 updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1678 applied_at INTEGER NOT NULL DEFAULT 0,
1679 snapshot_hash TEXT NOT NULL DEFAULT ''
1680 );
1681 INSERT INTO vector_embedding_contracts (
1682 profile,
1683 table_name,
1684 model_identity,
1685 model_version,
1686 dimension,
1687 normalization_policy,
1688 chunking_policy,
1689 preprocessing_policy,
1690 generator_command_json,
1691 updated_at,
1692 applied_at,
1693 snapshot_hash
1694 ) VALUES (
1695 'default',
1696 'vec_nodes_active',
1697 'legacy-model',
1698 '0.9.0',
1699 4,
1700 'l2',
1701 'per_chunk',
1702 'trim',
1703 '["/bin/echo"]',
1704 100,
1705 100,
1706 'legacy'
1707 );
1708 "#,
1709 )
1710 .expect("seed legacy schema");
1711 let manager = SchemaManager::new();
1712
1713 let report = manager.bootstrap(&conn).expect("bootstrap");
1714
1715 assert!(
1716 report.applied_versions.iter().any(|version| version.0 >= 5),
1717 "bootstrap should apply hardening migrations"
1718 );
1719 let format_version: i64 = conn
1720 .query_row(
1721 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1722 [],
1723 |row| row.get(0),
1724 )
1725 .expect("contract_format_version");
1726 assert_eq!(format_version, 1);
1727 let metadata_column_count: i64 = conn
1728 .query_row(
1729 "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1730 [],
1731 |row| row.get(0),
1732 )
1733 .expect("metadata_json column count");
1734 assert_eq!(metadata_column_count, 1);
1735 }
1736
1737 #[test]
1738 fn bootstrap_creates_operational_store_tables() {
1739 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1740 let manager = SchemaManager::new();
1741
1742 manager.bootstrap(&conn).expect("bootstrap");
1743
1744 for table in [
1745 "operational_collections",
1746 "operational_mutations",
1747 "operational_current",
1748 ] {
1749 let count: i64 = conn
1750 .query_row(
1751 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1752 [table],
1753 |row| row.get(0),
1754 )
1755 .expect("table existence");
1756 assert_eq!(count, 1, "{table} should exist after bootstrap");
1757 }
1758 let mutation_order_columns: i64 = conn
1759 .query_row(
1760 "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1761 [],
1762 |row| row.get(0),
1763 )
1764 .expect("mutation_order column exists");
1765 assert_eq!(mutation_order_columns, 1);
1766 }
1767
1768 #[test]
1769 fn bootstrap_is_idempotent_with_operational_store_tables() {
1770 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1771 let manager = SchemaManager::new();
1772
1773 manager.bootstrap(&conn).expect("first bootstrap");
1774 let report = manager.bootstrap(&conn).expect("second bootstrap");
1775
1776 assert!(
1777 report.applied_versions.is_empty(),
1778 "second bootstrap should apply no new migrations"
1779 );
1780 let count: i64 = conn
1781 .query_row(
1782 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1783 [],
1784 |row| row.get(0),
1785 )
1786 .expect("operational_collections table exists");
1787 assert_eq!(count, 1);
1788 }
1789
1790 #[test]
1791 fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1792 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1793 conn.execute_batch(
1794 r#"
1795 CREATE TABLE operational_collections (
1796 name TEXT PRIMARY KEY,
1797 kind TEXT NOT NULL,
1798 schema_json TEXT NOT NULL,
1799 retention_json TEXT NOT NULL,
1800 format_version INTEGER NOT NULL DEFAULT 1,
1801 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1802 disabled_at INTEGER
1803 );
1804
1805 CREATE TABLE operational_mutations (
1806 id TEXT PRIMARY KEY,
1807 collection_name TEXT NOT NULL,
1808 record_key TEXT NOT NULL,
1809 op_kind TEXT NOT NULL,
1810 payload_json TEXT NOT NULL,
1811 source_ref TEXT,
1812 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1813 mutation_order INTEGER NOT NULL DEFAULT 0,
1814 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1815 );
1816
1817 CREATE TABLE operational_current (
1818 collection_name TEXT NOT NULL,
1819 record_key TEXT NOT NULL,
1820 payload_json TEXT NOT NULL,
1821 updated_at INTEGER NOT NULL,
1822 last_mutation_id TEXT NOT NULL,
1823 PRIMARY KEY(collection_name, record_key),
1824 FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1825 FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1826 );
1827
1828 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1829 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1830 INSERT INTO operational_mutations (
1831 id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1832 ) VALUES (
1833 'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1834 );
1835 "#,
1836 )
1837 .expect("seed recovered operational tables");
1838
1839 let manager = SchemaManager::new();
1840 let report = manager
1841 .bootstrap(&conn)
1842 .expect("bootstrap recovered schema");
1843
1844 assert!(
1845 report.applied_versions.iter().any(|version| version.0 == 8),
1846 "bootstrap should record operational mutation ordering hardening"
1847 );
1848 let mutation_order: i64 = conn
1849 .query_row(
1850 "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1851 [],
1852 |row| row.get(0),
1853 )
1854 .expect("mutation_order");
1855 assert_ne!(
1856 mutation_order, 0,
1857 "bootstrap should backfill recovered operational rows"
1858 );
1859 let count: i64 = conn
1860 .query_row(
1861 "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1862 [],
1863 |row| row.get(0),
1864 )
1865 .expect("ordering index exists");
1866 assert_eq!(count, 1);
1867 }
1868
1869 #[test]
1870 fn bootstrap_adds_operational_filter_contract_and_index_table() {
1871 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1872 conn.execute_batch(
1873 r#"
1874 CREATE TABLE operational_collections (
1875 name TEXT PRIMARY KEY,
1876 kind TEXT NOT NULL,
1877 schema_json TEXT NOT NULL,
1878 retention_json TEXT NOT NULL,
1879 format_version INTEGER NOT NULL DEFAULT 1,
1880 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1881 disabled_at INTEGER
1882 );
1883
1884 CREATE TABLE operational_mutations (
1885 id TEXT PRIMARY KEY,
1886 collection_name TEXT NOT NULL,
1887 record_key TEXT NOT NULL,
1888 op_kind TEXT NOT NULL,
1889 payload_json TEXT NOT NULL,
1890 source_ref TEXT,
1891 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1892 mutation_order INTEGER NOT NULL DEFAULT 1,
1893 FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1894 );
1895
1896 INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1897 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1898 "#,
1899 )
1900 .expect("seed recovered operational schema");
1901
1902 let manager = SchemaManager::new();
1903 let report = manager
1904 .bootstrap(&conn)
1905 .expect("bootstrap recovered schema");
1906
1907 assert!(
1908 report
1909 .applied_versions
1910 .iter()
1911 .any(|version| version.0 == 10),
1912 "bootstrap should record operational filtered read migration"
1913 );
1914 assert!(
1915 report
1916 .applied_versions
1917 .iter()
1918 .any(|version| version.0 == 11),
1919 "bootstrap should record operational validation migration"
1920 );
1921 let filter_fields_json: String = conn
1922 .query_row(
1923 "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1924 [],
1925 |row| row.get(0),
1926 )
1927 .expect("filter_fields_json added");
1928 assert_eq!(filter_fields_json, "[]");
1929 let validation_json: String = conn
1930 .query_row(
1931 "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1932 [],
1933 |row| row.get(0),
1934 )
1935 .expect("validation_json added");
1936 assert_eq!(validation_json, "");
1937 let table_count: i64 = conn
1938 .query_row(
1939 "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1940 [],
1941 |row| row.get(0),
1942 )
1943 .expect("filter table exists");
1944 assert_eq!(table_count, 1);
1945 }
1946
1947 #[test]
1948 fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1949 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1950 let manager = SchemaManager::new();
1951 manager.bootstrap(&conn).expect("initial bootstrap");
1952
1953 conn.execute("DROP TABLE fathom_schema_migrations", [])
1954 .expect("drop migration history");
1955 SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1956
1957 let report = manager
1958 .bootstrap(&conn)
1959 .expect("rebootstrap existing schema");
1960
1961 assert!(
1962 report
1963 .applied_versions
1964 .iter()
1965 .any(|version| version.0 == 10),
1966 "rebootstrap should re-record migration 10"
1967 );
1968 assert!(
1969 report
1970 .applied_versions
1971 .iter()
1972 .any(|version| version.0 == 11),
1973 "rebootstrap should re-record migration 11"
1974 );
1975 let filter_fields_json: String = conn
1976 .query_row(
1977 "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1978 [],
1979 |row| row.get(0),
1980 )
1981 .unwrap_or_else(|_| "[]".to_string());
1982 assert_eq!(filter_fields_json, "[]");
1983 let validation_json: String = conn
1984 .query_row(
1985 "SELECT validation_json FROM operational_collections LIMIT 1",
1986 [],
1987 |row| row.get(0),
1988 )
1989 .unwrap_or_default();
1990 assert_eq!(validation_json, "");
1991 }
1992
1993 #[test]
1994 fn downgrade_detected_returns_version_mismatch() {
1995 use crate::SchemaError;
1996
1997 let conn = Connection::open_in_memory().expect("in-memory sqlite");
1998 let manager = SchemaManager::new();
1999 manager.bootstrap(&conn).expect("initial bootstrap");
2000
2001 conn.execute(
2002 "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
2003 (999_i64, "future migration"),
2004 )
2005 .expect("insert future version");
2006
2007 let err = manager
2008 .bootstrap(&conn)
2009 .expect_err("should fail on downgrade");
2010 assert!(
2011 matches!(
2012 err,
2013 SchemaError::VersionMismatch {
2014 database_version: 999,
2015 ..
2016 }
2017 ),
2018 "expected VersionMismatch with database_version 999, got: {err}"
2019 );
2020 }
2021
2022 #[test]
2023 fn journal_size_limit_is_set() {
2024 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2025 let manager = SchemaManager::new();
2026 manager
2027 .initialize_connection(&conn)
2028 .expect("initialize_connection");
2029
2030 let limit: i64 = conn
2031 .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
2032 .expect("journal_size_limit pragma");
2033 assert_eq!(limit, 536_870_912);
2034 }
2035
2036 #[cfg(feature = "sqlite-vec")]
2037 #[test]
2038 fn vector_profile_created_when_feature_enabled() {
2039 unsafe {
2042 #[allow(clippy::missing_transmute_annotations)]
2043 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2044 sqlite_vec::sqlite3_vec_init as *const (),
2045 )));
2046 }
2047 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2048 let manager = SchemaManager::new();
2049 manager.bootstrap(&conn).expect("bootstrap");
2050
2051 manager
2052 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
2053 .expect("ensure_vector_profile");
2054
2055 let count: i64 = conn
2056 .query_row(
2057 "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
2058 [],
2059 |row| row.get(0),
2060 )
2061 .expect("count");
2062 assert_eq!(
2063 count, 1,
2064 "vector profile must be enabled after ensure_vector_profile"
2065 );
2066
2067 let _: i64 = conn
2069 .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
2070 row.get(0)
2071 })
2072 .expect("vec_nodes_active table must exist after ensure_vector_profile");
2073 }
2074
2075 #[test]
2078 fn fts_kind_table_name_simple_kind() {
2079 let result = super::fts_kind_table_name("WMKnowledgeObject");
2080 assert!(result.starts_with("fts_props_wmknowledgeobject_"));
2081 assert!(result.len() <= 63);
2082 }
2083
2084 #[test]
2085 fn fts_kind_table_name_another_simple_kind() {
2086 let result = super::fts_kind_table_name("WMExecutionRecord");
2087 assert!(result.starts_with("fts_props_wmexecutionrecord_"));
2088 assert!(result.len() <= 63);
2089 }
2090
2091 #[test]
2092 fn fts_kind_table_name_with_separator_chars() {
2093 let result = super::fts_kind_table_name("MyKind-With.Dots");
2094 assert!(result.starts_with("fts_props_mykind_with_dots_"));
2095 assert!(result.len() <= 63);
2096 }
2097
2098 #[test]
2099 fn fts_kind_table_name_collapses_consecutive_underscores() {
2100 let result = super::fts_kind_table_name("Kind__Double__Underscores");
2101 assert!(result.starts_with("fts_props_kind_double_underscores_"));
2102 assert!(result.len() <= 63);
2103 }
2104
2105 #[test]
2106 fn fts_kind_table_name_long_kind_truncates_with_hash() {
2107 let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
2109 let result = super::fts_kind_table_name(long_kind);
2110 assert!(
2111 result.len() <= 63,
2112 "result must fit sqlite identifier budget"
2113 );
2114 assert!(
2115 result.starts_with("fts_props_"),
2116 "result must start with fts_props_"
2117 );
2118 let last_underscore = result.rfind('_').expect("must contain underscore");
2120 let hex_suffix = &result[last_underscore + 1..];
2121 assert_eq!(hex_suffix.len(), 10, "hex suffix must be 10 chars");
2122 assert!(
2123 hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
2124 "hex suffix must be hex digits"
2125 );
2126 }
2127
2128 #[test]
2129 fn fts_kind_table_name_testkind() {
2130 let result = super::fts_kind_table_name("TestKind");
2131 assert!(result.starts_with("fts_props_testkind_"));
2132 }
2133
2134 #[test]
2135 fn projection_table_names_do_not_collide_after_sanitization() {
2136 assert_ne!(
2137 super::fts_kind_table_name("Foo-Bar"),
2138 super::fts_kind_table_name("Foo_Bar")
2139 );
2140 assert_ne!(
2141 super::vec_kind_table_name("Foo-Bar"),
2142 super::vec_kind_table_name("Foo_Bar")
2143 );
2144 assert_eq!(
2145 super::legacy_fts_kind_table_name("Foo-Bar"),
2146 super::legacy_fts_kind_table_name("Foo_Bar")
2147 );
2148 }
2149
2150 #[test]
2153 fn fts_column_name_simple_field() {
2154 assert_eq!(super::fts_column_name("$.title", false), "title");
2155 }
2156
2157 #[test]
2158 fn fts_column_name_nested_path() {
2159 assert_eq!(
2160 super::fts_column_name("$.payload.content", false),
2161 "payload_content"
2162 );
2163 }
2164
2165 #[test]
2166 fn fts_column_name_recursive() {
2167 assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
2168 }
2169
2170 #[test]
2171 fn fts_column_name_special_chars() {
2172 assert_eq!(
2173 super::fts_column_name("$.some-field[0]", false),
2174 "some_field_0"
2175 );
2176 }
2177
2178 #[test]
2181 fn resolve_fts_tokenizer_returns_default_when_no_table() {
2182 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2183 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2185 assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
2186 }
2187
2188 #[test]
2189 fn resolve_fts_tokenizer_returns_configured_value() {
2190 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2191 conn.execute_batch(
2192 "CREATE TABLE projection_profiles (
2193 kind TEXT NOT NULL,
2194 facet TEXT NOT NULL,
2195 config_json TEXT NOT NULL,
2196 active_at INTEGER,
2197 created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
2198 PRIMARY KEY (kind, facet)
2199 );
2200 INSERT INTO projection_profiles (kind, facet, config_json)
2201 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2202 )
2203 .expect("setup table");
2204
2205 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2206 assert_eq!(result, "trigram");
2207
2208 let default_result =
2209 super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
2210 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2211 }
2212
2213 #[test]
2216 fn migration_20_creates_projection_profiles_table() {
2217 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2218 let manager = SchemaManager::new();
2219 manager.bootstrap(&conn).expect("bootstrap");
2220
2221 let table_exists: i64 = conn
2222 .query_row(
2223 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
2224 [],
2225 |row| row.get(0),
2226 )
2227 .expect("query sqlite_master");
2228 assert_eq!(table_exists, 1, "projection_profiles table must exist");
2229
2230 conn.execute_batch(
2232 "INSERT INTO projection_profiles (kind, facet, config_json)
2233 VALUES ('TestKind', 'fts', '{}')",
2234 )
2235 .expect("insert row to verify columns");
2236 let (kind, facet, config_json): (String, String, String) = conn
2237 .query_row(
2238 "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
2239 [],
2240 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2241 )
2242 .expect("select columns");
2243 assert_eq!(kind, "TestKind");
2244 assert_eq!(facet, "fts");
2245 assert_eq!(config_json, "{}");
2246 }
2247
2248 #[test]
2249 fn migration_20_primary_key_is_kind_facet() {
2250 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2251 let manager = SchemaManager::new();
2252 manager.bootstrap(&conn).expect("bootstrap");
2253
2254 conn.execute_batch(
2255 "INSERT INTO projection_profiles (kind, facet, config_json)
2256 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
2257 )
2258 .expect("first insert");
2259
2260 let result = conn.execute_batch(
2262 "INSERT INTO projection_profiles (kind, facet, config_json)
2263 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2264 );
2265 assert!(
2266 result.is_err(),
2267 "duplicate (kind, facet) must violate PRIMARY KEY"
2268 );
2269 }
2270
2271 #[test]
2272 fn migration_20_resolve_fts_tokenizer_end_to_end() {
2273 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2274 let manager = SchemaManager::new();
2275 manager.bootstrap(&conn).expect("bootstrap");
2276
2277 conn.execute_batch(
2278 "INSERT INTO projection_profiles (kind, facet, config_json)
2279 VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2280 )
2281 .expect("insert profile");
2282
2283 let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2284 assert_eq!(result, "trigram");
2285
2286 let default_result =
2287 super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2288 assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2289 }
2290
2291 #[test]
2292 fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2293 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2294 let manager = SchemaManager::new();
2295
2296 manager.bootstrap(&conn).expect("first bootstrap");
2304
2305 conn.execute_batch(
2306 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2307 VALUES ('TestKind', '[]', ',', 1)",
2308 )
2309 .expect("insert kind");
2310
2311 SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2314
2315 let table = super::fts_kind_table_name("TestKind");
2316 let count: i64 = conn
2317 .query_row(
2318 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
2319 [table.as_str()],
2320 |r| r.get(0),
2321 )
2322 .expect("count fts table");
2323 assert_eq!(count, 1, "{table} virtual table should be created");
2324
2325 let state: String = conn
2327 .query_row(
2328 "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2329 [],
2330 |r| r.get(0),
2331 )
2332 .expect("rebuild state row");
2333 assert_eq!(state, "PENDING");
2334 }
2335
2336 #[test]
2337 fn migration_22_adds_columns_json_to_staging_table() {
2338 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2339 let manager = SchemaManager::new();
2340 manager.bootstrap(&conn).expect("bootstrap");
2341
2342 let col_count: i64 = conn
2343 .query_row(
2344 "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2345 [],
2346 |r| r.get(0),
2347 )
2348 .expect("pragma_table_info");
2349 assert_eq!(
2350 col_count, 1,
2351 "columns_json column must exist after migration 22"
2352 );
2353 }
2354
2355 #[test]
2358 fn vec_kind_table_name_simple_kind() {
2359 let result = super::vec_kind_table_name("WMKnowledgeObject");
2360 assert!(result.starts_with("vec_wmknowledgeobject_"));
2361 assert!(result.len() <= 63);
2362 }
2363
2364 #[test]
2365 fn vec_kind_table_name_another_kind() {
2366 let result = super::vec_kind_table_name("MyKind");
2367 assert!(result.starts_with("vec_mykind_"));
2368 assert!(result.len() <= 63);
2369 }
2370
2371 #[test]
2372 fn vec_kind_table_name_with_separator_chars() {
2373 let result = super::vec_kind_table_name("MyKind-With.Dots");
2374 assert!(result.starts_with("vec_mykind_with_dots_"));
2375 assert!(result.len() <= 63);
2376 }
2377
2378 #[test]
2379 fn vec_kind_table_name_collapses_consecutive_underscores() {
2380 let result = super::vec_kind_table_name("Kind__Double__Underscores");
2381 assert!(result.starts_with("vec_kind_double_underscores_"));
2382 assert!(result.len() <= 63);
2383 }
2384
2385 #[cfg(feature = "sqlite-vec")]
2388 #[test]
2389 fn per_kind_vec_table_created_when_vec_profile_registered() {
2390 unsafe {
2393 #[allow(clippy::missing_transmute_annotations)]
2394 rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2395 sqlite_vec::sqlite3_vec_init as *const (),
2396 )));
2397 }
2398 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2399 let manager = SchemaManager::new();
2400 manager.bootstrap(&conn).expect("bootstrap");
2401
2402 manager
2405 .ensure_vec_kind_profile(&conn, "MyKind", 128)
2406 .expect("ensure_vec_kind_profile");
2407
2408 let expected_table = super::vec_kind_table_name("MyKind");
2410 let count: i64 = conn
2411 .query_row(
2412 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
2413 rusqlite::params![&expected_table],
2414 |r| r.get(0),
2415 )
2416 .expect("query sqlite_master");
2417 assert_eq!(count, 1, "{expected_table} virtual table must be created");
2418
2419 let pp_count: i64 = conn
2421 .query_row(
2422 "SELECT count(*) FROM projection_profiles WHERE kind='MyKind' AND facet='vec'",
2423 [],
2424 |r| r.get(0),
2425 )
2426 .expect("query projection_profiles");
2427 assert_eq!(
2428 pp_count, 1,
2429 "projection_profiles row must exist for (MyKind, vec)"
2430 );
2431
2432 let old_count: i64 = conn
2434 .query_row(
2435 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
2436 [],
2437 |r| r.get(0),
2438 )
2439 .expect("query sqlite_master");
2440 assert_eq!(
2441 old_count, 0,
2442 "vec_nodes_active must NOT be created for per-kind registration"
2443 );
2444 }
2445
2446 #[test]
2448 fn migration_23_drops_global_fts_node_properties_table() {
2449 let conn = Connection::open_in_memory().expect("in-memory sqlite");
2450 let manager = SchemaManager::new();
2451 manager.bootstrap(&conn).expect("bootstrap");
2452
2453 let count: i64 = conn
2455 .query_row(
2456 "SELECT count(*) FROM sqlite_master \
2457 WHERE type='table' AND name='fts_node_properties'",
2458 [],
2459 |r| r.get(0),
2460 )
2461 .expect("check sqlite_master");
2462 assert_eq!(
2463 count, 0,
2464 "fts_node_properties must be dropped by migration 23"
2465 );
2466 }
2467}