Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7    Migration::new(
8        SchemaVersion(1),
9        "initial canonical schema and runtime tables",
10        r"
11                CREATE TABLE IF NOT EXISTS nodes (
12                    row_id TEXT PRIMARY KEY,
13                    logical_id TEXT NOT NULL,
14                    kind TEXT NOT NULL,
15                    properties BLOB NOT NULL,
16                    created_at INTEGER NOT NULL,
17                    superseded_at INTEGER,
18                    source_ref TEXT,
19                    confidence REAL
20                );
21
22                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23                    ON nodes(logical_id)
24                    WHERE superseded_at IS NULL;
25                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26                    ON nodes(kind, superseded_at);
27                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28                    ON nodes(source_ref);
29
30                CREATE TABLE IF NOT EXISTS edges (
31                    row_id TEXT PRIMARY KEY,
32                    logical_id TEXT NOT NULL,
33                    source_logical_id TEXT NOT NULL,
34                    target_logical_id TEXT NOT NULL,
35                    kind TEXT NOT NULL,
36                    properties BLOB NOT NULL,
37                    created_at INTEGER NOT NULL,
38                    superseded_at INTEGER,
39                    source_ref TEXT,
40                    confidence REAL
41                );
42
43                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44                    ON edges(logical_id)
45                    WHERE superseded_at IS NULL;
46                CREATE INDEX IF NOT EXISTS idx_edges_source_active
47                    ON edges(source_logical_id, kind, superseded_at);
48                CREATE INDEX IF NOT EXISTS idx_edges_target_active
49                    ON edges(target_logical_id, kind, superseded_at);
50                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51                    ON edges(source_ref);
52
53                CREATE TABLE IF NOT EXISTS chunks (
54                    id TEXT PRIMARY KEY,
55                    node_logical_id TEXT NOT NULL,
56                    text_content TEXT NOT NULL,
57                    byte_start INTEGER,
58                    byte_end INTEGER,
59                    created_at INTEGER NOT NULL
60                );
61
62                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63                    ON chunks(node_logical_id);
64
65                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66                    chunk_id UNINDEXED,
67                    node_logical_id UNINDEXED,
68                    kind UNINDEXED,
69                    text_content
70                );
71
72                CREATE TABLE IF NOT EXISTS vector_profiles (
73                    profile TEXT PRIMARY KEY,
74                    table_name TEXT NOT NULL,
75                    dimension INTEGER NOT NULL,
76                    enabled INTEGER NOT NULL DEFAULT 0
77                );
78
79                CREATE TABLE IF NOT EXISTS runs (
80                    id TEXT PRIMARY KEY,
81                    kind TEXT NOT NULL,
82                    status TEXT NOT NULL,
83                    properties BLOB NOT NULL,
84                    created_at INTEGER NOT NULL,
85                    completed_at INTEGER,
86                    superseded_at INTEGER,
87                    source_ref TEXT
88                );
89
90                CREATE TABLE IF NOT EXISTS steps (
91                    id TEXT PRIMARY KEY,
92                    run_id TEXT NOT NULL,
93                    kind TEXT NOT NULL,
94                    status TEXT NOT NULL,
95                    properties BLOB NOT NULL,
96                    created_at INTEGER NOT NULL,
97                    completed_at INTEGER,
98                    superseded_at INTEGER,
99                    source_ref TEXT,
100                    FOREIGN KEY(run_id) REFERENCES runs(id)
101                );
102
103                CREATE TABLE IF NOT EXISTS actions (
104                    id TEXT PRIMARY KEY,
105                    step_id TEXT NOT NULL,
106                    kind TEXT NOT NULL,
107                    status TEXT NOT NULL,
108                    properties BLOB NOT NULL,
109                    created_at INTEGER NOT NULL,
110                    completed_at INTEGER,
111                    superseded_at INTEGER,
112                    source_ref TEXT,
113                    FOREIGN KEY(step_id) REFERENCES steps(id)
114                );
115
116                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117                    ON runs(source_ref);
118                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119                    ON steps(source_ref);
120                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121                    ON actions(source_ref);
122                ",
123    ),
124    Migration::new(
125        SchemaVersion(2),
126        "durable audit trail: provenance_events table",
127        r"
128                CREATE TABLE IF NOT EXISTS provenance_events (
129                    id         TEXT PRIMARY KEY,
130                    event_type TEXT NOT NULL,
131                    subject    TEXT NOT NULL,
132                    source_ref TEXT,
133                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
134                );
135                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136                    ON provenance_events (subject, event_type);
137                ",
138    ),
139    Migration::new(
140        SchemaVersion(3),
141        "vector regeneration contracts",
142        r"
143                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144                    profile TEXT PRIMARY KEY,
145                    table_name TEXT NOT NULL,
146                    model_identity TEXT NOT NULL,
147                    model_version TEXT NOT NULL,
148                    dimension INTEGER NOT NULL,
149                    normalization_policy TEXT NOT NULL,
150                    chunking_policy TEXT NOT NULL,
151                    preprocessing_policy TEXT NOT NULL,
152                    generator_command_json TEXT NOT NULL,
153                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
154                );
155                ",
156    ),
157    Migration::new(
158        SchemaVersion(4),
159        "vector regeneration apply metadata",
160        r"
161                ALTER TABLE vector_embedding_contracts
162                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163                ALTER TABLE vector_embedding_contracts
164                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165                UPDATE vector_embedding_contracts
166                SET
167                    applied_at = CASE
168                        WHEN applied_at = 0 THEN updated_at
169                        ELSE applied_at
170                    END,
171                    snapshot_hash = CASE
172                        WHEN snapshot_hash = '' THEN 'legacy'
173                        ELSE snapshot_hash
174                    END;
175                ",
176    ),
177    Migration::new(
178        SchemaVersion(5),
179        "vector regeneration contract format version",
180        r"
181                ALTER TABLE vector_embedding_contracts
182                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183                UPDATE vector_embedding_contracts
184                SET contract_format_version = 1
185                WHERE contract_format_version = 0;
186                ",
187    ),
188    Migration::new(
189        SchemaVersion(6),
190        "provenance metadata payloads",
191        r"
192                ALTER TABLE provenance_events
193                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194                ",
195    ),
196    Migration::new(
197        SchemaVersion(7),
198        "operational store canonical and derived tables",
199        r"
200                CREATE TABLE IF NOT EXISTS operational_collections (
201                    name TEXT PRIMARY KEY,
202                    kind TEXT NOT NULL,
203                    schema_json TEXT NOT NULL,
204                    retention_json TEXT NOT NULL,
205                    format_version INTEGER NOT NULL DEFAULT 1,
206                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
207                    disabled_at INTEGER
208                );
209
210                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211                    ON operational_collections(kind, disabled_at);
212
213                CREATE TABLE IF NOT EXISTS operational_mutations (
214                    id TEXT PRIMARY KEY,
215                    collection_name TEXT NOT NULL,
216                    record_key TEXT NOT NULL,
217                    op_kind TEXT NOT NULL,
218                    payload_json TEXT NOT NULL,
219                    source_ref TEXT,
220                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
221                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222                );
223
224                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227                    ON operational_mutations(source_ref);
228
229                CREATE TABLE IF NOT EXISTS operational_current (
230                    collection_name TEXT NOT NULL,
231                    record_key TEXT NOT NULL,
232                    payload_json TEXT NOT NULL,
233                    updated_at INTEGER NOT NULL,
234                    last_mutation_id TEXT NOT NULL,
235                    PRIMARY KEY(collection_name, record_key),
236                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238                );
239
240                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241                    ON operational_current(collection_name, updated_at DESC);
242                ",
243    ),
244    Migration::new(
245        SchemaVersion(8),
246        "operational mutation ordering hardening",
247        r"
248                ALTER TABLE operational_mutations
249                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250                UPDATE operational_mutations
251                SET mutation_order = rowid
252                WHERE mutation_order = 0;
253                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
255                ",
256    ),
257    Migration::new(
258        SchemaVersion(9),
259        "node last_accessed metadata",
260        r"
261                CREATE TABLE IF NOT EXISTS node_access_metadata (
262                    logical_id TEXT PRIMARY KEY,
263                    last_accessed_at INTEGER NOT NULL,
264                    updated_at INTEGER NOT NULL
265                );
266
267                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268                    ON node_access_metadata(last_accessed_at DESC);
269                ",
270    ),
271    Migration::new(
272        SchemaVersion(10),
273        "operational filtered read contracts and extracted values",
274        r"
275                ALTER TABLE operational_collections
276                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278                CREATE TABLE IF NOT EXISTS operational_filter_values (
279                    mutation_id TEXT NOT NULL,
280                    collection_name TEXT NOT NULL,
281                    field_name TEXT NOT NULL,
282                    string_value TEXT,
283                    integer_value INTEGER,
284                    PRIMARY KEY(mutation_id, field_name),
285                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287                );
288
289                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293                ",
294    ),
295    Migration::new(
296        SchemaVersion(11),
297        "operational payload validation contracts",
298        r"
299                ALTER TABLE operational_collections
300                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301                ",
302    ),
303    Migration::new(
304        SchemaVersion(12),
305        "operational secondary indexes",
306        r"
307                ALTER TABLE operational_collections
308                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311                    collection_name TEXT NOT NULL,
312                    index_name TEXT NOT NULL,
313                    subject_kind TEXT NOT NULL,
314                    mutation_id TEXT NOT NULL DEFAULT '',
315                    record_key TEXT NOT NULL DEFAULT '',
316                    sort_timestamp INTEGER,
317                    slot1_text TEXT,
318                    slot1_integer INTEGER,
319                    slot2_text TEXT,
320                    slot2_integer INTEGER,
321                    slot3_text TEXT,
322                    slot3_integer INTEGER,
323                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326                );
327
328                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329                    ON operational_secondary_index_entries(
330                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331                    );
332                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333                    ON operational_secondary_index_entries(
334                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335                    );
336                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337                    ON operational_secondary_index_entries(
338                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339                    );
340                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341                    ON operational_secondary_index_entries(
342                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343                );
344                ",
345    ),
346    Migration::new(
347        SchemaVersion(13),
348        "operational retention run metadata",
349        r"
350                CREATE TABLE IF NOT EXISTS operational_retention_runs (
351                    id TEXT PRIMARY KEY,
352                    collection_name TEXT NOT NULL,
353                    executed_at INTEGER NOT NULL,
354                    action_kind TEXT NOT NULL,
355                    dry_run INTEGER NOT NULL DEFAULT 0,
356                    deleted_mutations INTEGER NOT NULL,
357                    rows_remaining INTEGER NOT NULL,
358                    metadata_json TEXT NOT NULL DEFAULT '',
359                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360                );
361
362                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363                    ON operational_retention_runs(collection_name, executed_at DESC);
364                ",
365    ),
366    Migration::new(
367        SchemaVersion(14),
368        "external content object columns",
369        r"
370                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373                    ON nodes(content_ref)
374                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377                ",
378    ),
379    Migration::new(
380        SchemaVersion(15),
381        "FTS property projection schemas",
382        r"
383                CREATE TABLE IF NOT EXISTS fts_property_schemas (
384                    kind TEXT PRIMARY KEY,
385                    property_paths_json TEXT NOT NULL,
386                    separator TEXT NOT NULL DEFAULT ' ',
387                    format_version INTEGER NOT NULL DEFAULT 1,
388                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
389                );
390
391                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392                    node_logical_id UNINDEXED,
393                    kind UNINDEXED,
394                    text_content
395                );
396                ",
397    ),
398    Migration::new(
399        SchemaVersion(16),
400        "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401        // DDL applied by `ensure_unicode_porter_fts_tokenizers`; the hook
402        // drops, recreates, and rebuilds both tables from canonical state.
403        // The FTS5 chained tokenizer syntax requires the wrapper (porter)
404        // before the base tokenizer, so the applied `tokenize=` clause is
405        // `'porter unicode61 remove_diacritics 2'`.
406        "",
407    ),
408    Migration::new(
409        SchemaVersion(17),
410        "fts property position-map sidecar for recursive extraction",
411        // Sidecar position map for property FTS. The recursive extraction
412        // walk in the engine emits one row per scalar leaf contributing to a
413        // given node's property FTS blob, carrying the half-open byte range
414        // `[start_offset, end_offset)` within the blob and the JSON-path of
415        // the originating leaf. Phase 5 uses this to attribute tokens back
416        // to their source leaves.
417        //
418        // The existing `fts_property_schemas.property_paths_json` column is
419        // reused unchanged at the DDL level; the engine-side JSON decoder
420        // tolerates both the legacy shape (bare strings = scalar) and the
421        // new shape (objects with `mode` = `scalar`|`recursive`, optional
422        // `exclude_paths`). Backwards compatibility is guaranteed because a
423        // bare JSON array of strings still deserialises into scalar-mode
424        // entries.
425        r"
426                CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427                    node_logical_id TEXT NOT NULL,
428                    kind TEXT NOT NULL,
429                    start_offset INTEGER NOT NULL,
430                    end_offset INTEGER NOT NULL,
431                    leaf_path TEXT NOT NULL
432                );
433
434                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435                    ON fts_node_property_positions(node_logical_id, kind);
436
437                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438                    ON fts_node_property_positions(kind);
439                ",
440    ),
441    Migration::new(
442        SchemaVersion(18),
443        "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444        // P4-P2-4: the v17 sidecar DDL did not enforce uniqueness of the
445        // `(node_logical_id, kind, start_offset)` tuple, so a buggy rebuild
446        // path could silently double-insert a leaf and break attribution
447        // lookups. Drop and recreate the table with the UNIQUE constraint,
448        // preserving the existing indexes. The DDL leaves the table empty,
449        // which the open-time rebuild guard in `ExecutionCoordinator::open`
450        // detects (empty positions + recursive schemas present) and
451        // repopulates from canonical state on the next open. The rebuild
452        // path is idempotent and safe to run unconditionally.
453        //
454        // `fts_node_property_positions` is a regular SQLite table, not an
455        // FTS5 virtual table, so UNIQUE constraints are supported.
456        r"
457                DROP TABLE IF EXISTS fts_node_property_positions;
458
459                CREATE TABLE fts_node_property_positions (
460                    node_logical_id TEXT NOT NULL,
461                    kind TEXT NOT NULL,
462                    start_offset INTEGER NOT NULL,
463                    end_offset INTEGER NOT NULL,
464                    leaf_path TEXT NOT NULL,
465                    UNIQUE(node_logical_id, kind, start_offset)
466                );
467
468                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469                    ON fts_node_property_positions(node_logical_id, kind);
470
471                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472                    ON fts_node_property_positions(kind);
473                ",
474    ),
475    Migration::new(
476        SchemaVersion(19),
477        "async property-FTS rebuild staging and state tables",
478        r"
479                CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480                    kind TEXT NOT NULL,
481                    node_logical_id TEXT NOT NULL,
482                    text_content TEXT NOT NULL,
483                    positions_blob BLOB,
484                    PRIMARY KEY (kind, node_logical_id)
485                );
486
487                CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488                    kind TEXT PRIMARY KEY,
489                    schema_id INTEGER NOT NULL,
490                    state TEXT NOT NULL,
491                    rows_total INTEGER,
492                    rows_done INTEGER NOT NULL DEFAULT 0,
493                    started_at INTEGER NOT NULL,
494                    last_progress_at INTEGER,
495                    error_message TEXT,
496                    is_first_registration INTEGER NOT NULL DEFAULT 0
497                );
498                ",
499    ),
500    Migration::new(
501        SchemaVersion(20),
502        "projection_profiles table for per-kind FTS tokenizer configuration",
503        r"CREATE TABLE IF NOT EXISTS projection_profiles (
504            kind        TEXT    NOT NULL,
505            facet       TEXT    NOT NULL,
506            config_json TEXT    NOT NULL,
507            active_at   INTEGER,
508            created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
509            PRIMARY KEY (kind, facet)
510        );",
511    ),
512    Migration::new(
513        SchemaVersion(21),
514        "per-kind FTS5 tables replacing fts_node_properties",
515        "",
516    ),
517    Migration::new(
518        SchemaVersion(22),
519        "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520        "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521    ),
522    Migration::new(
523        SchemaVersion(23),
524        "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525        "DROP TABLE IF EXISTS fts_node_properties;",
526    ),
527];
528
529#[derive(Clone, Debug, PartialEq, Eq)]
530pub struct BootstrapReport {
531    pub sqlite_version: String,
532    pub applied_versions: Vec<SchemaVersion>,
533    pub vector_profile_enabled: bool,
534}
535
536#[derive(Clone, Debug, Default)]
537pub struct SchemaManager;
538
539impl SchemaManager {
540    #[must_use]
541    pub fn new() -> Self {
542        Self
543    }
544
545    /// Bootstrap the database schema, applying any pending migrations.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
550    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
551    /// to a version newer than this engine supports.
552    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
553        self.initialize_connection(conn)?;
554        Self::ensure_metadata_tables(conn)?;
555
556        // Downgrade protection
557        let max_applied: u32 = conn.query_row(
558            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
559            [],
560            |row| row.get(0),
561        )?;
562        let engine_version = self.current_version().0;
563        trace_info!(
564            current_version = max_applied,
565            engine_version,
566            "schema bootstrap: version check"
567        );
568        if max_applied > engine_version {
569            trace_error!(
570                database_version = max_applied,
571                engine_version,
572                "schema version mismatch: database is newer than engine"
573            );
574            return Err(SchemaError::VersionMismatch {
575                database_version: max_applied,
576                engine_version,
577            });
578        }
579
580        let mut applied_versions = Vec::new();
581        for migration in self.migrations() {
582            let already_applied = conn
583                .query_row(
584                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
585                    [i64::from(migration.version.0)],
586                    |row| row.get::<_, i64>(0),
587                )
588                .optional()?
589                .is_some();
590
591            if already_applied {
592                continue;
593            }
594
595            let tx = conn.unchecked_transaction()?;
596            match migration.version {
597                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
598                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
599                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
600                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
601                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
602                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
603                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
604                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
605                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
606                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
607                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
608                SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
609                SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
610                SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
611                _ => tx.execute_batch(migration.sql)?,
612            }
613            tx.execute(
614                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
615                (i64::from(migration.version.0), migration.description),
616            )?;
617            tx.commit()?;
618            trace_info!(
619                version = migration.version.0,
620                description = migration.description,
621                "schema migration applied"
622            );
623            applied_versions.push(migration.version);
624        }
625
626        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
627        let vector_profile_count: i64 = conn.query_row(
628            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
629            [],
630            |row| row.get(0),
631        )?;
632        Ok(BootstrapReport {
633            sqlite_version,
634            applied_versions,
635            vector_profile_enabled: vector_profile_count > 0,
636        })
637    }
638
639    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
640        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
641        let columns = stmt
642            .query_map([], |row| row.get::<_, String>(1))?
643            .collect::<Result<Vec<_>, _>>()?;
644        let has_applied_at = columns.iter().any(|column| column == "applied_at");
645        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
646
647        if !has_applied_at {
648            conn.execute(
649                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
650                [],
651            )?;
652        }
653        if !has_snapshot_hash {
654            conn.execute(
655                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
656                [],
657            )?;
658        }
659        conn.execute(
660            r"
661            UPDATE vector_embedding_contracts
662            SET
663                applied_at = CASE
664                    WHEN applied_at = 0 THEN updated_at
665                    ELSE applied_at
666                END,
667                snapshot_hash = CASE
668                    WHEN snapshot_hash = '' THEN 'legacy'
669                    ELSE snapshot_hash
670                END
671            ",
672            [],
673        )?;
674        Ok(())
675    }
676
677    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
678        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
679        let columns = stmt
680            .query_map([], |row| row.get::<_, String>(1))?
681            .collect::<Result<Vec<_>, _>>()?;
682        let has_contract_format_version = columns
683            .iter()
684            .any(|column| column == "contract_format_version");
685
686        if !has_contract_format_version {
687            conn.execute(
688                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
689                [],
690            )?;
691        }
692        conn.execute(
693            r"
694            UPDATE vector_embedding_contracts
695            SET contract_format_version = 1
696            WHERE contract_format_version = 0
697            ",
698            [],
699        )?;
700        Ok(())
701    }
702
703    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
704        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
705        let columns = stmt
706            .query_map([], |row| row.get::<_, String>(1))?
707            .collect::<Result<Vec<_>, _>>()?;
708        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
709
710        if !has_metadata_json {
711            conn.execute(
712                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
713                [],
714            )?;
715        }
716        Ok(())
717    }
718
719    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
720        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
721        let columns = stmt
722            .query_map([], |row| row.get::<_, String>(1))?
723            .collect::<Result<Vec<_>, _>>()?;
724        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
725
726        if !has_mutation_order {
727            conn.execute(
728                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
729                [],
730            )?;
731        }
732        conn.execute(
733            r"
734            UPDATE operational_mutations
735            SET mutation_order = rowid
736            WHERE mutation_order = 0
737            ",
738            [],
739        )?;
740        conn.execute(
741            r"
742            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
743                ON operational_mutations(collection_name, record_key, mutation_order DESC)
744            ",
745            [],
746        )?;
747        Ok(())
748    }
749
750    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
751        conn.execute_batch(
752            r"
753            CREATE TABLE IF NOT EXISTS node_access_metadata (
754                logical_id TEXT PRIMARY KEY,
755                last_accessed_at INTEGER NOT NULL,
756                updated_at INTEGER NOT NULL
757            );
758
759            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
760                ON node_access_metadata(last_accessed_at DESC);
761            ",
762        )?;
763        Ok(())
764    }
765
766    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
767        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
768        let columns = stmt
769            .query_map([], |row| row.get::<_, String>(1))?
770            .collect::<Result<Vec<_>, _>>()?;
771        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
772
773        if !has_filter_fields_json {
774            conn.execute(
775                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
776                [],
777            )?;
778        }
779
780        conn.execute_batch(
781            r"
782            CREATE TABLE IF NOT EXISTS operational_filter_values (
783                mutation_id TEXT NOT NULL,
784                collection_name TEXT NOT NULL,
785                field_name TEXT NOT NULL,
786                string_value TEXT,
787                integer_value INTEGER,
788                PRIMARY KEY(mutation_id, field_name),
789                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
790                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
791            );
792
793            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
794                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
795            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
796                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
797            ",
798        )?;
799        Ok(())
800    }
801
802    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
803        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
804        let columns = stmt
805            .query_map([], |row| row.get::<_, String>(1))?
806            .collect::<Result<Vec<_>, _>>()?;
807        let has_validation_json = columns.iter().any(|column| column == "validation_json");
808
809        if !has_validation_json {
810            conn.execute(
811                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
812                [],
813            )?;
814        }
815
816        Ok(())
817    }
818
819    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
820        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
821        let columns = stmt
822            .query_map([], |row| row.get::<_, String>(1))?
823            .collect::<Result<Vec<_>, _>>()?;
824        let has_secondary_indexes_json = columns
825            .iter()
826            .any(|column| column == "secondary_indexes_json");
827
828        if !has_secondary_indexes_json {
829            conn.execute(
830                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
831                [],
832            )?;
833        }
834
835        conn.execute_batch(
836            r"
837            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
838                collection_name TEXT NOT NULL,
839                index_name TEXT NOT NULL,
840                subject_kind TEXT NOT NULL,
841                mutation_id TEXT NOT NULL DEFAULT '',
842                record_key TEXT NOT NULL DEFAULT '',
843                sort_timestamp INTEGER,
844                slot1_text TEXT,
845                slot1_integer INTEGER,
846                slot2_text TEXT,
847                slot2_integer INTEGER,
848                slot3_text TEXT,
849                slot3_integer INTEGER,
850                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
851                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
852                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
853            );
854
855            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
856                ON operational_secondary_index_entries(
857                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
858                );
859            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
860                ON operational_secondary_index_entries(
861                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
862                );
863            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
864                ON operational_secondary_index_entries(
865                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
866                );
867            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
868                ON operational_secondary_index_entries(
869                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
870                );
871            ",
872        )?;
873
874        Ok(())
875    }
876
877    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
878        conn.execute_batch(
879            r"
880            CREATE TABLE IF NOT EXISTS operational_retention_runs (
881                id TEXT PRIMARY KEY,
882                collection_name TEXT NOT NULL,
883                executed_at INTEGER NOT NULL,
884                action_kind TEXT NOT NULL,
885                dry_run INTEGER NOT NULL DEFAULT 0,
886                deleted_mutations INTEGER NOT NULL,
887                rows_remaining INTEGER NOT NULL,
888                metadata_json TEXT NOT NULL DEFAULT '',
889                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
890            );
891
892            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
893                ON operational_retention_runs(collection_name, executed_at DESC);
894            ",
895        )?;
896        Ok(())
897    }
898
899    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
900        let node_columns = Self::column_names(conn, "nodes")?;
901        if !node_columns.iter().any(|c| c == "content_ref") {
902            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
903        }
904        conn.execute_batch(
905            r"
906            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
907                ON nodes(content_ref)
908                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
909            ",
910        )?;
911
912        let chunk_columns = Self::column_names(conn, "chunks")?;
913        if !chunk_columns.iter().any(|c| c == "content_hash") {
914            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
915        }
916        Ok(())
917    }
918
919    /// Migration 16: migrate both `fts_nodes` and `fts_node_properties` from
920    /// the default FTS5 simple tokenizer to `unicode61 remove_diacritics 2
921    /// porter` so diacritic-insensitive and stem-aware matches work (e.g.
922    /// `cafe` matching `café`, `shipping` matching `ship`).
923    ///
924    /// FTS5 does not support re-tokenizing an existing index in place, so
925    /// both virtual tables are dropped and recreated with the new
926    /// `tokenize=...` clause. `fts_nodes` is rebuilt inline from the
927    /// canonical `chunks + nodes` join. `fts_node_properties` is left empty
928    /// here and repopulated from canonical state by the engine runtime after
929    /// bootstrap (the property FTS rebuild requires the per-kind
930    /// `fts_property_schemas` projection that lives in the engine crate).
931    ///
932    /// A malformed row encountered during the inline `INSERT ... SELECT`
933    /// causes the migration to abort: the rusqlite error propagates up
934    /// through `execute_batch` and rolls back the outer migration
935    /// transaction so the schema version is not advanced.
936    fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
937        conn.execute_batch(
938            r"
939            DROP TABLE IF EXISTS fts_nodes;
940            CREATE VIRTUAL TABLE fts_nodes USING fts5(
941                chunk_id UNINDEXED,
942                node_logical_id UNINDEXED,
943                kind UNINDEXED,
944                text_content,
945                tokenize = 'porter unicode61 remove_diacritics 2'
946            );
947
948            DROP TABLE IF EXISTS fts_node_properties;
949            CREATE VIRTUAL TABLE fts_node_properties USING fts5(
950                node_logical_id UNINDEXED,
951                kind UNINDEXED,
952                text_content,
953                tokenize = 'porter unicode61 remove_diacritics 2'
954            );
955
956            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
957            SELECT c.id, n.logical_id, n.kind, c.text_content
958            FROM chunks c
959            JOIN nodes n
960              ON n.logical_id = c.node_logical_id
961             AND n.superseded_at IS NULL;
962            ",
963        )?;
964        Ok(())
965    }
966
967    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
968        conn.execute_batch(
969            r"
970            CREATE TABLE IF NOT EXISTS fts_property_schemas (
971                kind TEXT PRIMARY KEY,
972                property_paths_json TEXT NOT NULL,
973                separator TEXT NOT NULL DEFAULT ' ',
974                format_version INTEGER NOT NULL DEFAULT 1,
975                created_at INTEGER NOT NULL DEFAULT (unixepoch())
976            );
977
978            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
979                node_logical_id UNINDEXED,
980                kind UNINDEXED,
981                text_content
982            );
983            ",
984        )?;
985        Ok(())
986    }
987
988    fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
989        // Collect all registered kinds
990        let kinds: Vec<String> = {
991            let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
992            stmt.query_map([], |r| r.get::<_, String>(0))?
993                .collect::<Result<Vec<_>, _>>()?
994        };
995
996        for kind in &kinds {
997            let table_name = fts_kind_table_name(kind);
998            let ddl = format!(
999                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1000                    node_logical_id UNINDEXED, \
1001                    text_content, \
1002                    tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1003                )"
1004            );
1005            conn.execute_batch(&ddl)?;
1006
1007            // Enqueue a PENDING rebuild for this kind (idempotent)
1008            conn.execute(
1009                "INSERT OR IGNORE INTO fts_property_rebuild_state \
1010                 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1011                 SELECT ?1, rowid, 'PENDING', 0, unixepoch('now') * 1000, 0 \
1012                 FROM fts_property_schemas WHERE kind = ?1",
1013                rusqlite::params![kind],
1014            )?;
1015        }
1016
1017        // Drop the old global table (deferred: write sites must be updated in A-6 first)
1018        // Note: actual DROP happens in migration 23, after A-6 redirects write sites
1019        Ok(())
1020    }
1021
1022    fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1023        // Idempotent: check if the column already exists before altering
1024        let column_exists: bool = {
1025            let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1026            let names: Vec<String> = stmt
1027                .query_map([], |r| r.get::<_, String>(1))?
1028                .collect::<Result<Vec<_>, _>>()?;
1029            names.iter().any(|n| n == "columns_json")
1030        };
1031
1032        if !column_exists {
1033            conn.execute_batch(
1034                "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1035            )?;
1036        }
1037
1038        Ok(())
1039    }
1040
1041    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1042        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1043        let names = stmt
1044            .query_map([], |row| row.get::<_, String>(1))?
1045            .collect::<Result<Vec<_>, _>>()?;
1046        Ok(names)
1047    }
1048
1049    #[must_use]
1050    pub fn current_version(&self) -> SchemaVersion {
1051        self.migrations()
1052            .last()
1053            .map_or(SchemaVersion(0), |migration| migration.version)
1054    }
1055
1056    #[must_use]
1057    pub fn migrations(&self) -> &'static [Migration] {
1058        MIGRATIONS
1059    }
1060
1061    /// Set the recommended `SQLite` connection pragmas for fathomdb.
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1066    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1067        conn.execute_batch(
1068            r"
1069            PRAGMA foreign_keys = ON;
1070            PRAGMA journal_mode = WAL;
1071            PRAGMA synchronous = NORMAL;
1072            PRAGMA busy_timeout = 5000;
1073            PRAGMA temp_store = MEMORY;
1074            PRAGMA mmap_size = 3000000000;
1075            PRAGMA journal_size_limit = 536870912;
1076            ",
1077        )?;
1078        Ok(())
1079    }
1080
1081    /// Initialize a **read-only** connection with PRAGMAs that are safe for
1082    /// readers.
1083    ///
1084    /// Skips `journal_mode` (requires write; the writer already set WAL),
1085    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
1086    /// (requires write).
1087    ///
1088    /// # Errors
1089    ///
1090    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1091    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1092        conn.execute_batch(
1093            r"
1094            PRAGMA foreign_keys = ON;
1095            PRAGMA busy_timeout = 5000;
1096            PRAGMA temp_store = MEMORY;
1097            PRAGMA mmap_size = 3000000000;
1098            ",
1099        )?;
1100        Ok(())
1101    }
1102
1103    /// Ensure the sqlite-vec vector extension profile is registered and the
1104    /// virtual vec table exists.
1105    ///
1106    /// When the `sqlite-vec` feature is enabled this creates the virtual table
1107    /// and records the profile in `vector_profiles` (with `enabled = 1`).
1108    /// When the feature is absent the call always returns
1109    /// [`SchemaError::MissingCapability`].
1110    ///
1111    /// # Errors
1112    ///
1113    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1114    #[cfg(feature = "sqlite-vec")]
1115    pub fn ensure_vector_profile(
1116        &self,
1117        conn: &Connection,
1118        profile: &str,
1119        table_name: &str,
1120        dimension: usize,
1121    ) -> Result<(), SchemaError> {
1122        conn.execute_batch(&format!(
1123            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1124                chunk_id TEXT PRIMARY KEY,\
1125                embedding float[{dimension}]\
1126            )"
1127        ))?;
1128        // Vector dimensions are small positive integers (typically <= a few
1129        // thousand); convert explicitly so clippy's cast_possible_wrap is happy.
1130        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1131            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1132                format!("vector dimension {dimension} does not fit in i64").into(),
1133            ))
1134        })?;
1135        conn.execute(
1136            "INSERT OR REPLACE INTO vector_profiles \
1137             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1138            rusqlite::params![profile, table_name, dimension_i64],
1139        )?;
1140        Ok(())
1141    }
1142
1143    /// # Errors
1144    ///
1145    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1146    /// feature is not compiled in.
1147    #[cfg(not(feature = "sqlite-vec"))]
1148    pub fn ensure_vector_profile(
1149        &self,
1150        _conn: &Connection,
1151        _profile: &str,
1152        _table_name: &str,
1153        _dimension: usize,
1154    ) -> Result<(), SchemaError> {
1155        Err(SchemaError::MissingCapability("sqlite-vec"))
1156    }
1157
1158    /// Ensure a per-kind sqlite-vec virtual table exists and the
1159    /// `projection_profiles` row is recorded under `(kind, 'vec')`.
1160    ///
1161    /// The virtual table is named `vec_<sanitized_kind>` (via
1162    /// [`vec_kind_table_name`]).  A row is also written to the legacy
1163    /// `vector_profiles` table so that [`BootstrapReport::vector_profile_enabled`]
1164    /// continues to work.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1169    #[cfg(feature = "sqlite-vec")]
1170    pub fn ensure_vec_kind_profile(
1171        &self,
1172        conn: &Connection,
1173        kind: &str,
1174        dimension: usize,
1175    ) -> Result<(), SchemaError> {
1176        let table_name = vec_kind_table_name(kind);
1177        conn.execute_batch(&format!(
1178            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1179                chunk_id TEXT PRIMARY KEY,\
1180                embedding float[{dimension}]\
1181            )"
1182        ))?;
1183        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1184            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1185                format!("vector dimension {dimension} does not fit in i64").into(),
1186            ))
1187        })?;
1188        // Record in the legacy vector_profiles table so vector_profile_enabled works.
1189        conn.execute(
1190            "INSERT OR REPLACE INTO vector_profiles \
1191             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1192            rusqlite::params![kind, table_name, dimension_i64],
1193        )?;
1194        // Record in projection_profiles under (kind, 'vec').
1195        // Use "dimensions" (plural) to match what get_vec_profile extracts via json_extract.
1196        let config_json =
1197            format!(r#"{{"table_name":"{table_name}","dimensions":{dimension_i64}}}"#);
1198        conn.execute(
1199            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
1200             VALUES (?1, 'vec', ?2, unixepoch(), unixepoch()) \
1201             ON CONFLICT(kind, facet) DO UPDATE SET \
1202                 config_json = ?2, \
1203                 active_at   = unixepoch()",
1204            rusqlite::params![kind, config_json],
1205        )?;
1206        Ok(())
1207    }
1208
1209    /// # Errors
1210    ///
1211    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1212    /// feature is not compiled in.
1213    #[cfg(not(feature = "sqlite-vec"))]
1214    pub fn ensure_vec_kind_profile(
1215        &self,
1216        _conn: &Connection,
1217        _kind: &str,
1218        _dimension: usize,
1219    ) -> Result<(), SchemaError> {
1220        Err(SchemaError::MissingCapability("sqlite-vec"))
1221    }
1222
1223    /// Create the internal migration-tracking table if it does not exist.
1224    ///
1225    /// # Errors
1226    ///
1227    /// Returns [`SchemaError`] if the DDL fails to execute.
1228    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1229        conn.execute_batch(
1230            r"
1231            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1232                version INTEGER PRIMARY KEY,
1233                description TEXT NOT NULL,
1234                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1235            );
1236            ",
1237        )?;
1238        Ok(())
1239    }
1240}
1241
1242/// Default FTS5 tokenizer used when no per-kind profile is configured.
1243pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1244
1245/// Derive the canonical FTS5 virtual-table name for a given node `kind`.
1246///
1247/// Rules:
1248/// 1. Lowercase the kind string.
1249/// 2. Replace every character that is NOT `[a-z0-9]` with `_`.
1250/// 3. Collapse consecutive underscores to a single `_`.
1251/// 4. Prefix with `fts_props_`.
1252/// 5. If the result exceeds 63 characters: truncate the slug to 55 characters
1253///    and append `_` + the first 7 hex chars of the SHA-256 of the original kind.
1254#[must_use]
1255pub fn fts_kind_table_name(kind: &str) -> String {
1256    // Step 1-3: normalise the slug
1257    let lowered = kind.to_lowercase();
1258    let mut slug = String::with_capacity(lowered.len());
1259    let mut prev_was_underscore = false;
1260    for ch in lowered.chars() {
1261        if ch.is_ascii_alphanumeric() {
1262            slug.push(ch);
1263            prev_was_underscore = false;
1264        } else {
1265            if !prev_was_underscore {
1266                slug.push('_');
1267            }
1268            prev_was_underscore = true;
1269        }
1270    }
1271
1272    // Step 4: prefix
1273    let prefixed = format!("fts_props_{slug}");
1274
1275    // Step 5: truncate if needed
1276    if prefixed.len() <= 63 {
1277        prefixed
1278    } else {
1279        // Hash the original kind string
1280        let hash = Sha256::digest(kind.as_bytes());
1281        let mut hex = String::with_capacity(hash.len() * 2);
1282        for b in &hash {
1283            use std::fmt::Write as _;
1284            let _ = write!(hex, "{b:02x}");
1285        }
1286        let hex_suffix = &hex[..7];
1287        // Slug must be 55 chars so that "fts_props_" (10) + slug (55) + "_" (1) + hex7 (7) = 73 — too long.
1288        // Wait: total = 10 + slug_len + 1 + 7 = 63  => slug_len = 45
1289        let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1290        format!("fts_props_{slug_truncated}_{hex_suffix}")
1291    }
1292}
1293
1294/// Derive the canonical sqlite-vec virtual-table name for a given node `kind`.
1295///
1296/// Rules:
1297/// 1. Lowercase the kind string.
1298/// 2. Replace every character that is NOT `[a-z0-9]` with `_`.
1299/// 3. Collapse consecutive underscores to a single `_`.
1300/// 4. Prefix with `vec_`.
1301#[must_use]
1302pub fn vec_kind_table_name(kind: &str) -> String {
1303    let lowered = kind.to_lowercase();
1304    let mut slug = String::with_capacity(lowered.len());
1305    let mut prev_was_underscore = false;
1306    for ch in lowered.chars() {
1307        if ch.is_ascii_alphanumeric() {
1308            slug.push(ch);
1309            prev_was_underscore = false;
1310        } else {
1311            if !prev_was_underscore {
1312                slug.push('_');
1313            }
1314            prev_was_underscore = true;
1315        }
1316    }
1317    format!("vec_{slug}")
1318}
1319
1320/// Derive the canonical FTS5 column name for a JSON path.
1321///
1322/// Rules:
1323/// 1. Strip leading `$.` or `$` prefix.
1324/// 2. Replace every character that is NOT `[a-z0-9_]` (after lowercasing) with `_`.
1325/// 3. Collapse consecutive underscores.
1326/// 4. If `is_recursive` is `true`, append `_all`.
1327#[must_use]
1328pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1329    // Step 1: strip prefix
1330    let stripped = if let Some(rest) = path.strip_prefix("$.") {
1331        rest
1332    } else if let Some(rest) = path.strip_prefix('$') {
1333        rest
1334    } else {
1335        path
1336    };
1337
1338    // Step 2-3: normalise
1339    let lowered = stripped.to_lowercase();
1340    let mut col = String::with_capacity(lowered.len());
1341    let mut prev_was_underscore = false;
1342    for ch in lowered.chars() {
1343        if ch.is_ascii_alphanumeric() || ch == '_' {
1344            col.push(ch);
1345            prev_was_underscore = ch == '_';
1346        } else {
1347            if !prev_was_underscore {
1348                col.push('_');
1349            }
1350            prev_was_underscore = true;
1351        }
1352    }
1353
1354    // Strip trailing underscores
1355    let col = col.trim_end_matches('_').to_owned();
1356
1357    // Step 4: recursive suffix
1358    if is_recursive {
1359        format!("{col}_all")
1360    } else {
1361        col
1362    }
1363}
1364
1365/// Look up the FTS tokenizer configured for a given `kind` in `projection_profiles`.
1366///
1367/// Returns [`DEFAULT_FTS_TOKENIZER`] when:
1368/// - the `projection_profiles` table does not exist yet, or
1369/// - no row exists for `(kind, 'fts')`, or
1370/// - the `tokenizer` field in `config_json` is absent or empty.
1371///
1372/// # Errors
1373///
1374/// Returns [`SchemaError::Sqlite`] on any `SQLite` error other than a missing table.
1375pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1376    let result = conn
1377        .query_row(
1378            "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1379            [kind],
1380            |row| row.get::<_, Option<String>>(0),
1381        )
1382        .optional();
1383
1384    match result {
1385        Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1386        Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1387        Err(rusqlite::Error::SqliteFailure(_, _)) => {
1388            // Table doesn't exist or other sqlite-level error — return default
1389            Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1390        }
1391        Err(e) => Err(SchemaError::Sqlite(e)),
1392    }
1393}
1394
1395#[cfg(test)]
1396#[allow(clippy::expect_used)]
1397mod tests {
1398    use rusqlite::Connection;
1399
1400    use super::SchemaManager;
1401
1402    #[test]
1403    fn bootstrap_applies_initial_schema() {
1404        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1405        let manager = SchemaManager::new();
1406
1407        let report = manager.bootstrap(&conn).expect("bootstrap report");
1408
1409        assert_eq!(
1410            report.applied_versions.len(),
1411            manager.current_version().0 as usize
1412        );
1413        assert!(report.sqlite_version.starts_with('3'));
1414        let table_count: i64 = conn
1415            .query_row(
1416                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1417                [],
1418                |row| row.get(0),
1419            )
1420            .expect("nodes table exists");
1421        assert_eq!(table_count, 1);
1422    }
1423
1424    // --- Item 2: vector profile tests ---
1425
1426    #[test]
1427    fn vector_profile_not_enabled_without_feature() {
1428        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1429        let manager = SchemaManager::new();
1430        let report = manager.bootstrap(&conn).expect("bootstrap");
1431        assert!(
1432            !report.vector_profile_enabled,
1433            "vector_profile_enabled must be false on a fresh bootstrap"
1434        );
1435    }
1436
1437    #[test]
1438    fn vector_profile_skipped_when_dimension_absent() {
1439        // ensure_vector_profile is never called → enabled stays false
1440        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1441        let manager = SchemaManager::new();
1442        manager.bootstrap(&conn).expect("bootstrap");
1443
1444        let count: i64 = conn
1445            .query_row(
1446                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1447                [],
1448                |row| row.get(0),
1449            )
1450            .expect("count");
1451        assert_eq!(
1452            count, 0,
1453            "no enabled profile without calling ensure_vector_profile"
1454        );
1455    }
1456
1457    #[test]
1458    fn bootstrap_report_reflects_actual_vector_state() {
1459        // After a fresh bootstrap with no vector profile, the report reflects reality.
1460        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1461        let manager = SchemaManager::new();
1462        let report = manager.bootstrap(&conn).expect("bootstrap");
1463
1464        let db_count: i64 = conn
1465            .query_row(
1466                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1467                [],
1468                |row| row.get(0),
1469            )
1470            .expect("count");
1471        assert_eq!(
1472            report.vector_profile_enabled,
1473            db_count > 0,
1474            "BootstrapReport.vector_profile_enabled must match actual DB state"
1475        );
1476    }
1477
1478    #[test]
1479    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1480        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1481        conn.execute_batch(
1482            r#"
1483            CREATE TABLE provenance_events (
1484                id         TEXT PRIMARY KEY,
1485                event_type TEXT NOT NULL,
1486                subject    TEXT NOT NULL,
1487                source_ref TEXT,
1488                created_at INTEGER NOT NULL DEFAULT (unixepoch())
1489            );
1490            CREATE TABLE vector_embedding_contracts (
1491                profile TEXT PRIMARY KEY,
1492                table_name TEXT NOT NULL,
1493                model_identity TEXT NOT NULL,
1494                model_version TEXT NOT NULL,
1495                dimension INTEGER NOT NULL,
1496                normalization_policy TEXT NOT NULL,
1497                chunking_policy TEXT NOT NULL,
1498                preprocessing_policy TEXT NOT NULL,
1499                generator_command_json TEXT NOT NULL,
1500                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1501                applied_at INTEGER NOT NULL DEFAULT 0,
1502                snapshot_hash TEXT NOT NULL DEFAULT ''
1503            );
1504            INSERT INTO vector_embedding_contracts (
1505                profile,
1506                table_name,
1507                model_identity,
1508                model_version,
1509                dimension,
1510                normalization_policy,
1511                chunking_policy,
1512                preprocessing_policy,
1513                generator_command_json,
1514                updated_at,
1515                applied_at,
1516                snapshot_hash
1517            ) VALUES (
1518                'default',
1519                'vec_nodes_active',
1520                'legacy-model',
1521                '0.9.0',
1522                4,
1523                'l2',
1524                'per_chunk',
1525                'trim',
1526                '["/bin/echo"]',
1527                100,
1528                100,
1529                'legacy'
1530            );
1531            "#,
1532        )
1533        .expect("seed legacy schema");
1534        let manager = SchemaManager::new();
1535
1536        let report = manager.bootstrap(&conn).expect("bootstrap");
1537
1538        assert!(
1539            report.applied_versions.iter().any(|version| version.0 >= 5),
1540            "bootstrap should apply hardening migrations"
1541        );
1542        let format_version: i64 = conn
1543            .query_row(
1544                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1545                [],
1546                |row| row.get(0),
1547            )
1548            .expect("contract_format_version");
1549        assert_eq!(format_version, 1);
1550        let metadata_column_count: i64 = conn
1551            .query_row(
1552                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1553                [],
1554                |row| row.get(0),
1555            )
1556            .expect("metadata_json column count");
1557        assert_eq!(metadata_column_count, 1);
1558    }
1559
1560    #[test]
1561    fn bootstrap_creates_operational_store_tables() {
1562        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1563        let manager = SchemaManager::new();
1564
1565        manager.bootstrap(&conn).expect("bootstrap");
1566
1567        for table in [
1568            "operational_collections",
1569            "operational_mutations",
1570            "operational_current",
1571        ] {
1572            let count: i64 = conn
1573                .query_row(
1574                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1575                    [table],
1576                    |row| row.get(0),
1577                )
1578                .expect("table existence");
1579            assert_eq!(count, 1, "{table} should exist after bootstrap");
1580        }
1581        let mutation_order_columns: i64 = conn
1582            .query_row(
1583                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1584                [],
1585                |row| row.get(0),
1586            )
1587            .expect("mutation_order column exists");
1588        assert_eq!(mutation_order_columns, 1);
1589    }
1590
1591    #[test]
1592    fn bootstrap_is_idempotent_with_operational_store_tables() {
1593        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1594        let manager = SchemaManager::new();
1595
1596        manager.bootstrap(&conn).expect("first bootstrap");
1597        let report = manager.bootstrap(&conn).expect("second bootstrap");
1598
1599        assert!(
1600            report.applied_versions.is_empty(),
1601            "second bootstrap should apply no new migrations"
1602        );
1603        let count: i64 = conn
1604            .query_row(
1605                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1606                [],
1607                |row| row.get(0),
1608        )
1609        .expect("operational_collections table exists");
1610        assert_eq!(count, 1);
1611    }
1612
1613    #[test]
1614    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1615        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1616        conn.execute_batch(
1617            r#"
1618            CREATE TABLE operational_collections (
1619                name TEXT PRIMARY KEY,
1620                kind TEXT NOT NULL,
1621                schema_json TEXT NOT NULL,
1622                retention_json TEXT NOT NULL,
1623                format_version INTEGER NOT NULL DEFAULT 1,
1624                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1625                disabled_at INTEGER
1626            );
1627
1628            CREATE TABLE operational_mutations (
1629                id TEXT PRIMARY KEY,
1630                collection_name TEXT NOT NULL,
1631                record_key TEXT NOT NULL,
1632                op_kind TEXT NOT NULL,
1633                payload_json TEXT NOT NULL,
1634                source_ref TEXT,
1635                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1636                mutation_order INTEGER NOT NULL DEFAULT 0,
1637                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1638            );
1639
1640            CREATE TABLE operational_current (
1641                collection_name TEXT NOT NULL,
1642                record_key TEXT NOT NULL,
1643                payload_json TEXT NOT NULL,
1644                updated_at INTEGER NOT NULL,
1645                last_mutation_id TEXT NOT NULL,
1646                PRIMARY KEY(collection_name, record_key),
1647                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1648                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1649            );
1650
1651            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1652            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1653            INSERT INTO operational_mutations (
1654                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1655            ) VALUES (
1656                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1657            );
1658            "#,
1659        )
1660        .expect("seed recovered operational tables");
1661
1662        let manager = SchemaManager::new();
1663        let report = manager
1664            .bootstrap(&conn)
1665            .expect("bootstrap recovered schema");
1666
1667        assert!(
1668            report.applied_versions.iter().any(|version| version.0 == 8),
1669            "bootstrap should record operational mutation ordering hardening"
1670        );
1671        let mutation_order: i64 = conn
1672            .query_row(
1673                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1674                [],
1675                |row| row.get(0),
1676            )
1677            .expect("mutation_order");
1678        assert_ne!(
1679            mutation_order, 0,
1680            "bootstrap should backfill recovered operational rows"
1681        );
1682        let count: i64 = conn
1683            .query_row(
1684                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1685                [],
1686                |row| row.get(0),
1687            )
1688            .expect("ordering index exists");
1689        assert_eq!(count, 1);
1690    }
1691
1692    #[test]
1693    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1694        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1695        conn.execute_batch(
1696            r#"
1697            CREATE TABLE operational_collections (
1698                name TEXT PRIMARY KEY,
1699                kind TEXT NOT NULL,
1700                schema_json TEXT NOT NULL,
1701                retention_json TEXT NOT NULL,
1702                format_version INTEGER NOT NULL DEFAULT 1,
1703                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1704                disabled_at INTEGER
1705            );
1706
1707            CREATE TABLE operational_mutations (
1708                id TEXT PRIMARY KEY,
1709                collection_name TEXT NOT NULL,
1710                record_key TEXT NOT NULL,
1711                op_kind TEXT NOT NULL,
1712                payload_json TEXT NOT NULL,
1713                source_ref TEXT,
1714                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1715                mutation_order INTEGER NOT NULL DEFAULT 1,
1716                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1717            );
1718
1719            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1720            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1721            "#,
1722        )
1723        .expect("seed recovered operational schema");
1724
1725        let manager = SchemaManager::new();
1726        let report = manager
1727            .bootstrap(&conn)
1728            .expect("bootstrap recovered schema");
1729
1730        assert!(
1731            report
1732                .applied_versions
1733                .iter()
1734                .any(|version| version.0 == 10),
1735            "bootstrap should record operational filtered read migration"
1736        );
1737        assert!(
1738            report
1739                .applied_versions
1740                .iter()
1741                .any(|version| version.0 == 11),
1742            "bootstrap should record operational validation migration"
1743        );
1744        let filter_fields_json: String = conn
1745            .query_row(
1746                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1747                [],
1748                |row| row.get(0),
1749            )
1750            .expect("filter_fields_json added");
1751        assert_eq!(filter_fields_json, "[]");
1752        let validation_json: String = conn
1753            .query_row(
1754                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1755                [],
1756                |row| row.get(0),
1757            )
1758            .expect("validation_json added");
1759        assert_eq!(validation_json, "");
1760        let table_count: i64 = conn
1761            .query_row(
1762                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1763                [],
1764                |row| row.get(0),
1765        )
1766        .expect("filter table exists");
1767        assert_eq!(table_count, 1);
1768    }
1769
1770    #[test]
1771    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1772        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1773        let manager = SchemaManager::new();
1774        manager.bootstrap(&conn).expect("initial bootstrap");
1775
1776        conn.execute("DROP TABLE fathom_schema_migrations", [])
1777            .expect("drop migration history");
1778        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1779
1780        let report = manager
1781            .bootstrap(&conn)
1782            .expect("rebootstrap existing schema");
1783
1784        assert!(
1785            report
1786                .applied_versions
1787                .iter()
1788                .any(|version| version.0 == 10),
1789            "rebootstrap should re-record migration 10"
1790        );
1791        assert!(
1792            report
1793                .applied_versions
1794                .iter()
1795                .any(|version| version.0 == 11),
1796            "rebootstrap should re-record migration 11"
1797        );
1798        let filter_fields_json: String = conn
1799            .query_row(
1800                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1801                [],
1802                |row| row.get(0),
1803            )
1804            .unwrap_or_else(|_| "[]".to_string());
1805        assert_eq!(filter_fields_json, "[]");
1806        let validation_json: String = conn
1807            .query_row(
1808                "SELECT validation_json FROM operational_collections LIMIT 1",
1809                [],
1810                |row| row.get(0),
1811            )
1812            .unwrap_or_default();
1813        assert_eq!(validation_json, "");
1814    }
1815
1816    #[test]
1817    fn downgrade_detected_returns_version_mismatch() {
1818        use crate::SchemaError;
1819
1820        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1821        let manager = SchemaManager::new();
1822        manager.bootstrap(&conn).expect("initial bootstrap");
1823
1824        conn.execute(
1825            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1826            (999_i64, "future migration"),
1827        )
1828        .expect("insert future version");
1829
1830        let err = manager
1831            .bootstrap(&conn)
1832            .expect_err("should fail on downgrade");
1833        assert!(
1834            matches!(
1835                err,
1836                SchemaError::VersionMismatch {
1837                    database_version: 999,
1838                    ..
1839                }
1840            ),
1841            "expected VersionMismatch with database_version 999, got: {err}"
1842        );
1843    }
1844
1845    #[test]
1846    fn journal_size_limit_is_set() {
1847        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1848        let manager = SchemaManager::new();
1849        manager
1850            .initialize_connection(&conn)
1851            .expect("initialize_connection");
1852
1853        let limit: i64 = conn
1854            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1855            .expect("journal_size_limit pragma");
1856        assert_eq!(limit, 536_870_912);
1857    }
1858
1859    #[cfg(feature = "sqlite-vec")]
1860    #[test]
1861    fn vector_profile_created_when_feature_enabled() {
1862        // Register the sqlite-vec extension globally so the in-memory
1863        // connection can use the vec0 module.
1864        unsafe {
1865            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1866                sqlite_vec::sqlite3_vec_init as *const (),
1867            )));
1868        }
1869        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1870        let manager = SchemaManager::new();
1871        manager.bootstrap(&conn).expect("bootstrap");
1872
1873        manager
1874            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1875            .expect("ensure_vector_profile");
1876
1877        let count: i64 = conn
1878            .query_row(
1879                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1880                [],
1881                |row| row.get(0),
1882            )
1883            .expect("count");
1884        assert_eq!(
1885            count, 1,
1886            "vector profile must be enabled after ensure_vector_profile"
1887        );
1888
1889        // Verify the virtual table exists by querying it
1890        let _: i64 = conn
1891            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1892                row.get(0)
1893            })
1894            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1895    }
1896
1897    // --- A-1: fts_kind_table_name tests ---
1898
1899    #[test]
1900    fn fts_kind_table_name_simple_kind() {
1901        assert_eq!(
1902            super::fts_kind_table_name("WMKnowledgeObject"),
1903            "fts_props_wmknowledgeobject"
1904        );
1905    }
1906
1907    #[test]
1908    fn fts_kind_table_name_another_simple_kind() {
1909        assert_eq!(
1910            super::fts_kind_table_name("WMExecutionRecord"),
1911            "fts_props_wmexecutionrecord"
1912        );
1913    }
1914
1915    #[test]
1916    fn fts_kind_table_name_with_separator_chars() {
1917        assert_eq!(
1918            super::fts_kind_table_name("MyKind-With.Dots"),
1919            "fts_props_mykind_with_dots"
1920        );
1921    }
1922
1923    #[test]
1924    fn fts_kind_table_name_collapses_consecutive_underscores() {
1925        assert_eq!(
1926            super::fts_kind_table_name("Kind__Double__Underscores"),
1927            "fts_props_kind_double_underscores"
1928        );
1929    }
1930
1931    #[test]
1932    fn fts_kind_table_name_long_kind_truncates_with_hash() {
1933        // 61 A's — slug after prefix would be 61 chars, exceeding 63-char limit
1934        let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
1935        let result = super::fts_kind_table_name(long_kind);
1936        assert_eq!(result.len(), 63, "result must be exactly 63 chars");
1937        assert!(
1938            result.starts_with("fts_props_"),
1939            "result must start with fts_props_"
1940        );
1941        // Must contain underscore before 7-char hex suffix
1942        let last_underscore = result.rfind('_').expect("must contain underscore");
1943        let hex_suffix = &result[last_underscore + 1..];
1944        assert_eq!(hex_suffix.len(), 7, "hex suffix must be 7 chars");
1945        assert!(
1946            hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
1947            "hex suffix must be hex digits"
1948        );
1949    }
1950
1951    #[test]
1952    fn fts_kind_table_name_testkind() {
1953        assert_eq!(super::fts_kind_table_name("TestKind"), "fts_props_testkind");
1954    }
1955
1956    // --- A-1: fts_column_name tests ---
1957
1958    #[test]
1959    fn fts_column_name_simple_field() {
1960        assert_eq!(super::fts_column_name("$.title", false), "title");
1961    }
1962
1963    #[test]
1964    fn fts_column_name_nested_path() {
1965        assert_eq!(
1966            super::fts_column_name("$.payload.content", false),
1967            "payload_content"
1968        );
1969    }
1970
1971    #[test]
1972    fn fts_column_name_recursive() {
1973        assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
1974    }
1975
1976    #[test]
1977    fn fts_column_name_special_chars() {
1978        assert_eq!(
1979            super::fts_column_name("$.some-field[0]", false),
1980            "some_field_0"
1981        );
1982    }
1983
1984    // --- A-1: resolve_fts_tokenizer tests ---
1985
1986    #[test]
1987    fn resolve_fts_tokenizer_returns_default_when_no_table() {
1988        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1989        // No projection_profiles table — should return default
1990        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1991        assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
1992    }
1993
1994    #[test]
1995    fn resolve_fts_tokenizer_returns_configured_value() {
1996        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1997        conn.execute_batch(
1998            "CREATE TABLE projection_profiles (
1999                kind TEXT NOT NULL,
2000                facet TEXT NOT NULL,
2001                config_json TEXT NOT NULL,
2002                active_at INTEGER,
2003                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2004                PRIMARY KEY (kind, facet)
2005            );
2006            INSERT INTO projection_profiles (kind, facet, config_json)
2007            VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2008        )
2009        .expect("setup table");
2010
2011        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2012        assert_eq!(result, "trigram");
2013
2014        let default_result =
2015            super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
2016        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2017    }
2018
2019    // --- A-2: migration 20 tests ---
2020
2021    #[test]
2022    fn migration_20_creates_projection_profiles_table() {
2023        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2024        let manager = SchemaManager::new();
2025        manager.bootstrap(&conn).expect("bootstrap");
2026
2027        let table_exists: i64 = conn
2028            .query_row(
2029                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
2030                [],
2031                |row| row.get(0),
2032            )
2033            .expect("query sqlite_master");
2034        assert_eq!(table_exists, 1, "projection_profiles table must exist");
2035
2036        // Check columns exist by querying them
2037        conn.execute_batch(
2038            "INSERT INTO projection_profiles (kind, facet, config_json)
2039             VALUES ('TestKind', 'fts', '{}')",
2040        )
2041        .expect("insert row to verify columns");
2042        let (kind, facet, config_json): (String, String, String) = conn
2043            .query_row(
2044                "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
2045                [],
2046                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2047            )
2048            .expect("select columns");
2049        assert_eq!(kind, "TestKind");
2050        assert_eq!(facet, "fts");
2051        assert_eq!(config_json, "{}");
2052    }
2053
2054    #[test]
2055    fn migration_20_primary_key_is_kind_facet() {
2056        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2057        let manager = SchemaManager::new();
2058        manager.bootstrap(&conn).expect("bootstrap");
2059
2060        conn.execute_batch(
2061            "INSERT INTO projection_profiles (kind, facet, config_json)
2062             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
2063        )
2064        .expect("first insert");
2065
2066        // Second insert with same (kind, facet) must fail
2067        let result = conn.execute_batch(
2068            "INSERT INTO projection_profiles (kind, facet, config_json)
2069             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2070        );
2071        assert!(
2072            result.is_err(),
2073            "duplicate (kind, facet) must violate PRIMARY KEY"
2074        );
2075    }
2076
2077    #[test]
2078    fn migration_20_resolve_fts_tokenizer_end_to_end() {
2079        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2080        let manager = SchemaManager::new();
2081        manager.bootstrap(&conn).expect("bootstrap");
2082
2083        conn.execute_batch(
2084            "INSERT INTO projection_profiles (kind, facet, config_json)
2085             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2086        )
2087        .expect("insert profile");
2088
2089        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2090        assert_eq!(result, "trigram");
2091
2092        let default_result =
2093            super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2094        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2095    }
2096
2097    #[test]
2098    fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2099        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2100        let manager = SchemaManager::new();
2101
2102        // Bootstrap up through migration 20 by using a fresh DB, then manually insert
2103        // a kind into fts_property_schemas so migration 21 picks it up.
2104        // We do a full bootstrap (which applies all migrations including 21).
2105        // Insert the kind before bootstrapping so it is present when migration 21 runs.
2106        // Since bootstrap applies migrations in order and migration 15 creates
2107        // fts_property_schemas, we must insert after that table exists.
2108        // Strategy: bootstrap first, insert kind, then run bootstrap again (idempotent).
2109        manager.bootstrap(&conn).expect("first bootstrap");
2110
2111        conn.execute_batch(
2112            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2113             VALUES ('TestKind', '[]', ',', 1)",
2114        )
2115        .expect("insert kind");
2116
2117        // Now run ensure_per_kind_fts_tables directly by calling bootstrap again — migration 21
2118        // is already applied, so we test the function directly.
2119        SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2120
2121        // fts_props_testkind should exist
2122        let count: i64 = conn
2123            .query_row(
2124                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='fts_props_testkind'",
2125                [],
2126                |r| r.get(0),
2127            )
2128            .expect("count fts table");
2129        assert_eq!(
2130            count, 1,
2131            "fts_props_testkind virtual table should be created"
2132        );
2133
2134        // PENDING row should exist
2135        let state: String = conn
2136            .query_row(
2137                "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2138                [],
2139                |r| r.get(0),
2140            )
2141            .expect("rebuild state row");
2142        assert_eq!(state, "PENDING");
2143    }
2144
2145    #[test]
2146    fn migration_22_adds_columns_json_to_staging_table() {
2147        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2148        let manager = SchemaManager::new();
2149        manager.bootstrap(&conn).expect("bootstrap");
2150
2151        let col_count: i64 = conn
2152            .query_row(
2153                "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2154                [],
2155                |r| r.get(0),
2156            )
2157            .expect("pragma_table_info");
2158        assert_eq!(
2159            col_count, 1,
2160            "columns_json column must exist after migration 22"
2161        );
2162    }
2163
2164    // --- 0.5.0 item 6: vec_kind_table_name tests ---
2165
2166    #[test]
2167    fn vec_kind_table_name_simple_kind() {
2168        assert_eq!(
2169            super::vec_kind_table_name("WMKnowledgeObject"),
2170            "vec_wmknowledgeobject"
2171        );
2172    }
2173
2174    #[test]
2175    fn vec_kind_table_name_another_kind() {
2176        assert_eq!(super::vec_kind_table_name("MyKind"), "vec_mykind");
2177    }
2178
2179    #[test]
2180    fn vec_kind_table_name_with_separator_chars() {
2181        assert_eq!(
2182            super::vec_kind_table_name("MyKind-With.Dots"),
2183            "vec_mykind_with_dots"
2184        );
2185    }
2186
2187    #[test]
2188    fn vec_kind_table_name_collapses_consecutive_underscores() {
2189        assert_eq!(
2190            super::vec_kind_table_name("Kind__Double__Underscores"),
2191            "vec_kind_double_underscores"
2192        );
2193    }
2194
2195    // --- 0.5.0 item 6: per-kind vec tables ---
2196
2197    #[cfg(feature = "sqlite-vec")]
2198    #[test]
2199    fn per_kind_vec_table_created_when_vec_profile_registered() {
2200        // Register the sqlite-vec extension globally so the in-memory
2201        // connection can use the vec0 module.
2202        unsafe {
2203            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2204                sqlite_vec::sqlite3_vec_init as *const (),
2205            )));
2206        }
2207        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2208        let manager = SchemaManager::new();
2209        manager.bootstrap(&conn).expect("bootstrap");
2210
2211        // Register a vec profile for MyKind — should create vec_mykind, NOT vec_nodes_active
2212        manager
2213            .ensure_vec_kind_profile(&conn, "MyKind", 128)
2214            .expect("ensure_vec_kind_profile");
2215
2216        // vec_mykind virtual table must exist
2217        let count: i64 = conn
2218            .query_row(
2219                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_mykind'",
2220                [],
2221                |r| r.get(0),
2222            )
2223            .expect("query sqlite_master");
2224        assert_eq!(count, 1, "vec_mykind virtual table must be created");
2225
2226        // projection_profiles row must exist with (kind='MyKind', facet='vec')
2227        let pp_count: i64 = conn
2228            .query_row(
2229                "SELECT count(*) FROM projection_profiles WHERE kind='MyKind' AND facet='vec'",
2230                [],
2231                |r| r.get(0),
2232            )
2233            .expect("query projection_profiles");
2234        assert_eq!(
2235            pp_count, 1,
2236            "projection_profiles row must exist for (MyKind, vec)"
2237        );
2238
2239        // The old global vec_nodes_active must NOT have been created
2240        let old_count: i64 = conn
2241            .query_row(
2242                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
2243                [],
2244                |r| r.get(0),
2245            )
2246            .expect("query sqlite_master");
2247        assert_eq!(
2248            old_count, 0,
2249            "vec_nodes_active must NOT be created for per-kind registration"
2250        );
2251    }
2252
2253    // --- A-6: migration 23 drops fts_node_properties ---
2254    #[test]
2255    fn migration_23_drops_global_fts_node_properties_table() {
2256        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2257        let manager = SchemaManager::new();
2258        manager.bootstrap(&conn).expect("bootstrap");
2259
2260        // After migration 23, fts_node_properties must NOT exist in sqlite_master.
2261        let count: i64 = conn
2262            .query_row(
2263                "SELECT count(*) FROM sqlite_master \
2264                 WHERE type='table' AND name='fts_node_properties'",
2265                [],
2266                |r| r.get(0),
2267            )
2268            .expect("check sqlite_master");
2269        assert_eq!(
2270            count, 0,
2271            "fts_node_properties must be dropped by migration 23"
2272        );
2273    }
2274}