Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7    Migration::new(
8        SchemaVersion(1),
9        "initial canonical schema and runtime tables",
10        r"
11                CREATE TABLE IF NOT EXISTS nodes (
12                    row_id TEXT PRIMARY KEY,
13                    logical_id TEXT NOT NULL,
14                    kind TEXT NOT NULL,
15                    properties BLOB NOT NULL,
16                    created_at INTEGER NOT NULL,
17                    superseded_at INTEGER,
18                    source_ref TEXT,
19                    confidence REAL
20                );
21
22                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23                    ON nodes(logical_id)
24                    WHERE superseded_at IS NULL;
25                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26                    ON nodes(kind, superseded_at);
27                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28                    ON nodes(source_ref);
29
30                CREATE TABLE IF NOT EXISTS edges (
31                    row_id TEXT PRIMARY KEY,
32                    logical_id TEXT NOT NULL,
33                    source_logical_id TEXT NOT NULL,
34                    target_logical_id TEXT NOT NULL,
35                    kind TEXT NOT NULL,
36                    properties BLOB NOT NULL,
37                    created_at INTEGER NOT NULL,
38                    superseded_at INTEGER,
39                    source_ref TEXT,
40                    confidence REAL
41                );
42
43                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44                    ON edges(logical_id)
45                    WHERE superseded_at IS NULL;
46                CREATE INDEX IF NOT EXISTS idx_edges_source_active
47                    ON edges(source_logical_id, kind, superseded_at);
48                CREATE INDEX IF NOT EXISTS idx_edges_target_active
49                    ON edges(target_logical_id, kind, superseded_at);
50                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51                    ON edges(source_ref);
52
53                CREATE TABLE IF NOT EXISTS chunks (
54                    id TEXT PRIMARY KEY,
55                    node_logical_id TEXT NOT NULL,
56                    text_content TEXT NOT NULL,
57                    byte_start INTEGER,
58                    byte_end INTEGER,
59                    created_at INTEGER NOT NULL
60                );
61
62                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63                    ON chunks(node_logical_id);
64
65                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66                    chunk_id UNINDEXED,
67                    node_logical_id UNINDEXED,
68                    kind UNINDEXED,
69                    text_content
70                );
71
72                CREATE TABLE IF NOT EXISTS vector_profiles (
73                    profile TEXT PRIMARY KEY,
74                    table_name TEXT NOT NULL,
75                    dimension INTEGER NOT NULL,
76                    enabled INTEGER NOT NULL DEFAULT 0
77                );
78
79                CREATE TABLE IF NOT EXISTS runs (
80                    id TEXT PRIMARY KEY,
81                    kind TEXT NOT NULL,
82                    status TEXT NOT NULL,
83                    properties BLOB NOT NULL,
84                    created_at INTEGER NOT NULL,
85                    completed_at INTEGER,
86                    superseded_at INTEGER,
87                    source_ref TEXT
88                );
89
90                CREATE TABLE IF NOT EXISTS steps (
91                    id TEXT PRIMARY KEY,
92                    run_id TEXT NOT NULL,
93                    kind TEXT NOT NULL,
94                    status TEXT NOT NULL,
95                    properties BLOB NOT NULL,
96                    created_at INTEGER NOT NULL,
97                    completed_at INTEGER,
98                    superseded_at INTEGER,
99                    source_ref TEXT,
100                    FOREIGN KEY(run_id) REFERENCES runs(id)
101                );
102
103                CREATE TABLE IF NOT EXISTS actions (
104                    id TEXT PRIMARY KEY,
105                    step_id TEXT NOT NULL,
106                    kind TEXT NOT NULL,
107                    status TEXT NOT NULL,
108                    properties BLOB NOT NULL,
109                    created_at INTEGER NOT NULL,
110                    completed_at INTEGER,
111                    superseded_at INTEGER,
112                    source_ref TEXT,
113                    FOREIGN KEY(step_id) REFERENCES steps(id)
114                );
115
116                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117                    ON runs(source_ref);
118                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119                    ON steps(source_ref);
120                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121                    ON actions(source_ref);
122                ",
123    ),
124    Migration::new(
125        SchemaVersion(2),
126        "durable audit trail: provenance_events table",
127        r"
128                CREATE TABLE IF NOT EXISTS provenance_events (
129                    id         TEXT PRIMARY KEY,
130                    event_type TEXT NOT NULL,
131                    subject    TEXT NOT NULL,
132                    source_ref TEXT,
133                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
134                );
135                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136                    ON provenance_events (subject, event_type);
137                ",
138    ),
139    Migration::new(
140        SchemaVersion(3),
141        "vector regeneration contracts",
142        r"
143                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144                    profile TEXT PRIMARY KEY,
145                    table_name TEXT NOT NULL,
146                    model_identity TEXT NOT NULL,
147                    model_version TEXT NOT NULL,
148                    dimension INTEGER NOT NULL,
149                    normalization_policy TEXT NOT NULL,
150                    chunking_policy TEXT NOT NULL,
151                    preprocessing_policy TEXT NOT NULL,
152                    generator_command_json TEXT NOT NULL,
153                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
154                );
155                ",
156    ),
157    Migration::new(
158        SchemaVersion(4),
159        "vector regeneration apply metadata",
160        r"
161                ALTER TABLE vector_embedding_contracts
162                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163                ALTER TABLE vector_embedding_contracts
164                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165                UPDATE vector_embedding_contracts
166                SET
167                    applied_at = CASE
168                        WHEN applied_at = 0 THEN updated_at
169                        ELSE applied_at
170                    END,
171                    snapshot_hash = CASE
172                        WHEN snapshot_hash = '' THEN 'legacy'
173                        ELSE snapshot_hash
174                    END;
175                ",
176    ),
177    Migration::new(
178        SchemaVersion(5),
179        "vector regeneration contract format version",
180        r"
181                ALTER TABLE vector_embedding_contracts
182                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183                UPDATE vector_embedding_contracts
184                SET contract_format_version = 1
185                WHERE contract_format_version = 0;
186                ",
187    ),
188    Migration::new(
189        SchemaVersion(6),
190        "provenance metadata payloads",
191        r"
192                ALTER TABLE provenance_events
193                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194                ",
195    ),
196    Migration::new(
197        SchemaVersion(7),
198        "operational store canonical and derived tables",
199        r"
200                CREATE TABLE IF NOT EXISTS operational_collections (
201                    name TEXT PRIMARY KEY,
202                    kind TEXT NOT NULL,
203                    schema_json TEXT NOT NULL,
204                    retention_json TEXT NOT NULL,
205                    format_version INTEGER NOT NULL DEFAULT 1,
206                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
207                    disabled_at INTEGER
208                );
209
210                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211                    ON operational_collections(kind, disabled_at);
212
213                CREATE TABLE IF NOT EXISTS operational_mutations (
214                    id TEXT PRIMARY KEY,
215                    collection_name TEXT NOT NULL,
216                    record_key TEXT NOT NULL,
217                    op_kind TEXT NOT NULL,
218                    payload_json TEXT NOT NULL,
219                    source_ref TEXT,
220                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
221                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222                );
223
224                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227                    ON operational_mutations(source_ref);
228
229                CREATE TABLE IF NOT EXISTS operational_current (
230                    collection_name TEXT NOT NULL,
231                    record_key TEXT NOT NULL,
232                    payload_json TEXT NOT NULL,
233                    updated_at INTEGER NOT NULL,
234                    last_mutation_id TEXT NOT NULL,
235                    PRIMARY KEY(collection_name, record_key),
236                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238                );
239
240                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241                    ON operational_current(collection_name, updated_at DESC);
242                ",
243    ),
244    Migration::new(
245        SchemaVersion(8),
246        "operational mutation ordering hardening",
247        r"
248                ALTER TABLE operational_mutations
249                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250                UPDATE operational_mutations
251                SET mutation_order = rowid
252                WHERE mutation_order = 0;
253                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
255                ",
256    ),
257    Migration::new(
258        SchemaVersion(9),
259        "node last_accessed metadata",
260        r"
261                CREATE TABLE IF NOT EXISTS node_access_metadata (
262                    logical_id TEXT PRIMARY KEY,
263                    last_accessed_at INTEGER NOT NULL,
264                    updated_at INTEGER NOT NULL
265                );
266
267                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268                    ON node_access_metadata(last_accessed_at DESC);
269                ",
270    ),
271    Migration::new(
272        SchemaVersion(10),
273        "operational filtered read contracts and extracted values",
274        r"
275                ALTER TABLE operational_collections
276                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278                CREATE TABLE IF NOT EXISTS operational_filter_values (
279                    mutation_id TEXT NOT NULL,
280                    collection_name TEXT NOT NULL,
281                    field_name TEXT NOT NULL,
282                    string_value TEXT,
283                    integer_value INTEGER,
284                    PRIMARY KEY(mutation_id, field_name),
285                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287                );
288
289                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293                ",
294    ),
295    Migration::new(
296        SchemaVersion(11),
297        "operational payload validation contracts",
298        r"
299                ALTER TABLE operational_collections
300                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301                ",
302    ),
303    Migration::new(
304        SchemaVersion(12),
305        "operational secondary indexes",
306        r"
307                ALTER TABLE operational_collections
308                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311                    collection_name TEXT NOT NULL,
312                    index_name TEXT NOT NULL,
313                    subject_kind TEXT NOT NULL,
314                    mutation_id TEXT NOT NULL DEFAULT '',
315                    record_key TEXT NOT NULL DEFAULT '',
316                    sort_timestamp INTEGER,
317                    slot1_text TEXT,
318                    slot1_integer INTEGER,
319                    slot2_text TEXT,
320                    slot2_integer INTEGER,
321                    slot3_text TEXT,
322                    slot3_integer INTEGER,
323                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326                );
327
328                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329                    ON operational_secondary_index_entries(
330                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331                    );
332                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333                    ON operational_secondary_index_entries(
334                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335                    );
336                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337                    ON operational_secondary_index_entries(
338                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339                    );
340                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341                    ON operational_secondary_index_entries(
342                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343                );
344                ",
345    ),
346    Migration::new(
347        SchemaVersion(13),
348        "operational retention run metadata",
349        r"
350                CREATE TABLE IF NOT EXISTS operational_retention_runs (
351                    id TEXT PRIMARY KEY,
352                    collection_name TEXT NOT NULL,
353                    executed_at INTEGER NOT NULL,
354                    action_kind TEXT NOT NULL,
355                    dry_run INTEGER NOT NULL DEFAULT 0,
356                    deleted_mutations INTEGER NOT NULL,
357                    rows_remaining INTEGER NOT NULL,
358                    metadata_json TEXT NOT NULL DEFAULT '',
359                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360                );
361
362                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363                    ON operational_retention_runs(collection_name, executed_at DESC);
364                ",
365    ),
366    Migration::new(
367        SchemaVersion(14),
368        "external content object columns",
369        r"
370                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373                    ON nodes(content_ref)
374                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377                ",
378    ),
379    Migration::new(
380        SchemaVersion(15),
381        "FTS property projection schemas",
382        r"
383                CREATE TABLE IF NOT EXISTS fts_property_schemas (
384                    kind TEXT PRIMARY KEY,
385                    property_paths_json TEXT NOT NULL,
386                    separator TEXT NOT NULL DEFAULT ' ',
387                    format_version INTEGER NOT NULL DEFAULT 1,
388                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
389                );
390
391                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392                    node_logical_id UNINDEXED,
393                    kind UNINDEXED,
394                    text_content
395                );
396                ",
397    ),
398    Migration::new(
399        SchemaVersion(16),
400        "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401        // DDL applied by `ensure_unicode_porter_fts_tokenizers`; the hook
402        // drops, recreates, and rebuilds both tables from canonical state.
403        // The FTS5 chained tokenizer syntax requires the wrapper (porter)
404        // before the base tokenizer, so the applied `tokenize=` clause is
405        // `'porter unicode61 remove_diacritics 2'`.
406        "",
407    ),
408    Migration::new(
409        SchemaVersion(17),
410        "fts property position-map sidecar for recursive extraction",
411        // Sidecar position map for property FTS. The recursive extraction
412        // walk in the engine emits one row per scalar leaf contributing to a
413        // given node's property FTS blob, carrying the half-open byte range
414        // `[start_offset, end_offset)` within the blob and the JSON-path of
415        // the originating leaf. Phase 5 uses this to attribute tokens back
416        // to their source leaves.
417        //
418        // The existing `fts_property_schemas.property_paths_json` column is
419        // reused unchanged at the DDL level; the engine-side JSON decoder
420        // tolerates both the legacy shape (bare strings = scalar) and the
421        // new shape (objects with `mode` = `scalar`|`recursive`, optional
422        // `exclude_paths`). Backwards compatibility is guaranteed because a
423        // bare JSON array of strings still deserialises into scalar-mode
424        // entries.
425        r"
426                CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427                    node_logical_id TEXT NOT NULL,
428                    kind TEXT NOT NULL,
429                    start_offset INTEGER NOT NULL,
430                    end_offset INTEGER NOT NULL,
431                    leaf_path TEXT NOT NULL
432                );
433
434                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435                    ON fts_node_property_positions(node_logical_id, kind);
436
437                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438                    ON fts_node_property_positions(kind);
439                ",
440    ),
441    Migration::new(
442        SchemaVersion(18),
443        "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444        // P4-P2-4: the v17 sidecar DDL did not enforce uniqueness of the
445        // `(node_logical_id, kind, start_offset)` tuple, so a buggy rebuild
446        // path could silently double-insert a leaf and break attribution
447        // lookups. Drop and recreate the table with the UNIQUE constraint,
448        // preserving the existing indexes. The DDL leaves the table empty,
449        // which the open-time rebuild guard in `ExecutionCoordinator::open`
450        // detects (empty positions + recursive schemas present) and
451        // repopulates from canonical state on the next open. The rebuild
452        // path is idempotent and safe to run unconditionally.
453        //
454        // `fts_node_property_positions` is a regular SQLite table, not an
455        // FTS5 virtual table, so UNIQUE constraints are supported.
456        r"
457                DROP TABLE IF EXISTS fts_node_property_positions;
458
459                CREATE TABLE fts_node_property_positions (
460                    node_logical_id TEXT NOT NULL,
461                    kind TEXT NOT NULL,
462                    start_offset INTEGER NOT NULL,
463                    end_offset INTEGER NOT NULL,
464                    leaf_path TEXT NOT NULL,
465                    UNIQUE(node_logical_id, kind, start_offset)
466                );
467
468                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469                    ON fts_node_property_positions(node_logical_id, kind);
470
471                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472                    ON fts_node_property_positions(kind);
473                ",
474    ),
475    Migration::new(
476        SchemaVersion(19),
477        "async property-FTS rebuild staging and state tables",
478        r"
479                CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480                    kind TEXT NOT NULL,
481                    node_logical_id TEXT NOT NULL,
482                    text_content TEXT NOT NULL,
483                    positions_blob BLOB,
484                    PRIMARY KEY (kind, node_logical_id)
485                );
486
487                CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488                    kind TEXT PRIMARY KEY,
489                    schema_id INTEGER NOT NULL,
490                    state TEXT NOT NULL,
491                    rows_total INTEGER,
492                    rows_done INTEGER NOT NULL DEFAULT 0,
493                    started_at INTEGER NOT NULL,
494                    last_progress_at INTEGER,
495                    error_message TEXT,
496                    is_first_registration INTEGER NOT NULL DEFAULT 0
497                );
498                ",
499    ),
500    Migration::new(
501        SchemaVersion(20),
502        "projection_profiles table for per-kind FTS tokenizer configuration",
503        r"CREATE TABLE IF NOT EXISTS projection_profiles (
504            kind        TEXT    NOT NULL,
505            facet       TEXT    NOT NULL,
506            config_json TEXT    NOT NULL,
507            active_at   INTEGER,
508            created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
509            PRIMARY KEY (kind, facet)
510        );",
511    ),
512    Migration::new(
513        SchemaVersion(21),
514        "per-kind FTS5 tables replacing fts_node_properties",
515        "",
516    ),
517    Migration::new(
518        SchemaVersion(22),
519        "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520        "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521    ),
522    Migration::new(
523        SchemaVersion(23),
524        "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525        "DROP TABLE IF EXISTS fts_node_properties;",
526    ),
527];
528
529#[derive(Clone, Debug, PartialEq, Eq)]
530pub struct BootstrapReport {
531    pub sqlite_version: String,
532    pub applied_versions: Vec<SchemaVersion>,
533    pub vector_profile_enabled: bool,
534}
535
536#[derive(Clone, Debug, Default)]
537pub struct SchemaManager;
538
539impl SchemaManager {
540    #[must_use]
541    pub fn new() -> Self {
542        Self
543    }
544
545    /// Bootstrap the database schema, applying any pending migrations.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
550    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
551    /// to a version newer than this engine supports.
552    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
553        self.initialize_connection(conn)?;
554        Self::ensure_metadata_tables(conn)?;
555
556        // Downgrade protection
557        let max_applied: u32 = conn.query_row(
558            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
559            [],
560            |row| row.get(0),
561        )?;
562        let engine_version = self.current_version().0;
563        trace_info!(
564            current_version = max_applied,
565            engine_version,
566            "schema bootstrap: version check"
567        );
568        if max_applied > engine_version {
569            trace_error!(
570                database_version = max_applied,
571                engine_version,
572                "schema version mismatch: database is newer than engine"
573            );
574            return Err(SchemaError::VersionMismatch {
575                database_version: max_applied,
576                engine_version,
577            });
578        }
579
580        let mut applied_versions = Vec::new();
581        for migration in self.migrations() {
582            let already_applied = conn
583                .query_row(
584                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
585                    [i64::from(migration.version.0)],
586                    |row| row.get::<_, i64>(0),
587                )
588                .optional()?
589                .is_some();
590
591            if already_applied {
592                continue;
593            }
594
595            let tx = conn.unchecked_transaction()?;
596            match migration.version {
597                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
598                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
599                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
600                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
601                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
602                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
603                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
604                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
605                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
606                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
607                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
608                SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
609                SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
610                SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
611                _ => tx.execute_batch(migration.sql)?,
612            }
613            tx.execute(
614                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
615                (i64::from(migration.version.0), migration.description),
616            )?;
617            tx.commit()?;
618            trace_info!(
619                version = migration.version.0,
620                description = migration.description,
621                "schema migration applied"
622            );
623            applied_versions.push(migration.version);
624        }
625
626        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
627        let vector_profile_count: i64 = conn.query_row(
628            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
629            [],
630            |row| row.get(0),
631        )?;
632        Ok(BootstrapReport {
633            sqlite_version,
634            applied_versions,
635            vector_profile_enabled: vector_profile_count > 0,
636        })
637    }
638
639    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
640        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
641        let columns = stmt
642            .query_map([], |row| row.get::<_, String>(1))?
643            .collect::<Result<Vec<_>, _>>()?;
644        let has_applied_at = columns.iter().any(|column| column == "applied_at");
645        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
646
647        if !has_applied_at {
648            conn.execute(
649                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
650                [],
651            )?;
652        }
653        if !has_snapshot_hash {
654            conn.execute(
655                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
656                [],
657            )?;
658        }
659        conn.execute(
660            r"
661            UPDATE vector_embedding_contracts
662            SET
663                applied_at = CASE
664                    WHEN applied_at = 0 THEN updated_at
665                    ELSE applied_at
666                END,
667                snapshot_hash = CASE
668                    WHEN snapshot_hash = '' THEN 'legacy'
669                    ELSE snapshot_hash
670                END
671            ",
672            [],
673        )?;
674        Ok(())
675    }
676
677    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
678        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
679        let columns = stmt
680            .query_map([], |row| row.get::<_, String>(1))?
681            .collect::<Result<Vec<_>, _>>()?;
682        let has_contract_format_version = columns
683            .iter()
684            .any(|column| column == "contract_format_version");
685
686        if !has_contract_format_version {
687            conn.execute(
688                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
689                [],
690            )?;
691        }
692        conn.execute(
693            r"
694            UPDATE vector_embedding_contracts
695            SET contract_format_version = 1
696            WHERE contract_format_version = 0
697            ",
698            [],
699        )?;
700        Ok(())
701    }
702
703    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
704        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
705        let columns = stmt
706            .query_map([], |row| row.get::<_, String>(1))?
707            .collect::<Result<Vec<_>, _>>()?;
708        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
709
710        if !has_metadata_json {
711            conn.execute(
712                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
713                [],
714            )?;
715        }
716        Ok(())
717    }
718
719    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
720        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
721        let columns = stmt
722            .query_map([], |row| row.get::<_, String>(1))?
723            .collect::<Result<Vec<_>, _>>()?;
724        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
725
726        if !has_mutation_order {
727            conn.execute(
728                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
729                [],
730            )?;
731        }
732        conn.execute(
733            r"
734            UPDATE operational_mutations
735            SET mutation_order = rowid
736            WHERE mutation_order = 0
737            ",
738            [],
739        )?;
740        conn.execute(
741            r"
742            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
743                ON operational_mutations(collection_name, record_key, mutation_order DESC)
744            ",
745            [],
746        )?;
747        Ok(())
748    }
749
750    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
751        conn.execute_batch(
752            r"
753            CREATE TABLE IF NOT EXISTS node_access_metadata (
754                logical_id TEXT PRIMARY KEY,
755                last_accessed_at INTEGER NOT NULL,
756                updated_at INTEGER NOT NULL
757            );
758
759            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
760                ON node_access_metadata(last_accessed_at DESC);
761            ",
762        )?;
763        Ok(())
764    }
765
766    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
767        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
768        let columns = stmt
769            .query_map([], |row| row.get::<_, String>(1))?
770            .collect::<Result<Vec<_>, _>>()?;
771        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
772
773        if !has_filter_fields_json {
774            conn.execute(
775                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
776                [],
777            )?;
778        }
779
780        conn.execute_batch(
781            r"
782            CREATE TABLE IF NOT EXISTS operational_filter_values (
783                mutation_id TEXT NOT NULL,
784                collection_name TEXT NOT NULL,
785                field_name TEXT NOT NULL,
786                string_value TEXT,
787                integer_value INTEGER,
788                PRIMARY KEY(mutation_id, field_name),
789                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
790                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
791            );
792
793            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
794                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
795            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
796                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
797            ",
798        )?;
799        Ok(())
800    }
801
802    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
803        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
804        let columns = stmt
805            .query_map([], |row| row.get::<_, String>(1))?
806            .collect::<Result<Vec<_>, _>>()?;
807        let has_validation_json = columns.iter().any(|column| column == "validation_json");
808
809        if !has_validation_json {
810            conn.execute(
811                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
812                [],
813            )?;
814        }
815
816        Ok(())
817    }
818
819    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
820        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
821        let columns = stmt
822            .query_map([], |row| row.get::<_, String>(1))?
823            .collect::<Result<Vec<_>, _>>()?;
824        let has_secondary_indexes_json = columns
825            .iter()
826            .any(|column| column == "secondary_indexes_json");
827
828        if !has_secondary_indexes_json {
829            conn.execute(
830                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
831                [],
832            )?;
833        }
834
835        conn.execute_batch(
836            r"
837            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
838                collection_name TEXT NOT NULL,
839                index_name TEXT NOT NULL,
840                subject_kind TEXT NOT NULL,
841                mutation_id TEXT NOT NULL DEFAULT '',
842                record_key TEXT NOT NULL DEFAULT '',
843                sort_timestamp INTEGER,
844                slot1_text TEXT,
845                slot1_integer INTEGER,
846                slot2_text TEXT,
847                slot2_integer INTEGER,
848                slot3_text TEXT,
849                slot3_integer INTEGER,
850                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
851                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
852                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
853            );
854
855            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
856                ON operational_secondary_index_entries(
857                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
858                );
859            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
860                ON operational_secondary_index_entries(
861                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
862                );
863            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
864                ON operational_secondary_index_entries(
865                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
866                );
867            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
868                ON operational_secondary_index_entries(
869                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
870                );
871            ",
872        )?;
873
874        Ok(())
875    }
876
877    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
878        conn.execute_batch(
879            r"
880            CREATE TABLE IF NOT EXISTS operational_retention_runs (
881                id TEXT PRIMARY KEY,
882                collection_name TEXT NOT NULL,
883                executed_at INTEGER NOT NULL,
884                action_kind TEXT NOT NULL,
885                dry_run INTEGER NOT NULL DEFAULT 0,
886                deleted_mutations INTEGER NOT NULL,
887                rows_remaining INTEGER NOT NULL,
888                metadata_json TEXT NOT NULL DEFAULT '',
889                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
890            );
891
892            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
893                ON operational_retention_runs(collection_name, executed_at DESC);
894            ",
895        )?;
896        Ok(())
897    }
898
899    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
900        let node_columns = Self::column_names(conn, "nodes")?;
901        if !node_columns.iter().any(|c| c == "content_ref") {
902            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
903        }
904        conn.execute_batch(
905            r"
906            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
907                ON nodes(content_ref)
908                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
909            ",
910        )?;
911
912        let chunk_columns = Self::column_names(conn, "chunks")?;
913        if !chunk_columns.iter().any(|c| c == "content_hash") {
914            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
915        }
916        Ok(())
917    }
918
919    /// Migration 16: migrate both `fts_nodes` and `fts_node_properties` from
920    /// the default FTS5 simple tokenizer to `unicode61 remove_diacritics 2
921    /// porter` so diacritic-insensitive and stem-aware matches work (e.g.
922    /// `cafe` matching `café`, `shipping` matching `ship`).
923    ///
924    /// FTS5 does not support re-tokenizing an existing index in place, so
925    /// both virtual tables are dropped and recreated with the new
926    /// `tokenize=...` clause. `fts_nodes` is rebuilt inline from the
927    /// canonical `chunks + nodes` join. `fts_node_properties` is left empty
928    /// here and repopulated from canonical state by the engine runtime after
929    /// bootstrap (the property FTS rebuild requires the per-kind
930    /// `fts_property_schemas` projection that lives in the engine crate).
931    ///
932    /// A malformed row encountered during the inline `INSERT ... SELECT`
933    /// causes the migration to abort: the rusqlite error propagates up
934    /// through `execute_batch` and rolls back the outer migration
935    /// transaction so the schema version is not advanced.
936    fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
937        conn.execute_batch(
938            r"
939            DROP TABLE IF EXISTS fts_nodes;
940            CREATE VIRTUAL TABLE fts_nodes USING fts5(
941                chunk_id UNINDEXED,
942                node_logical_id UNINDEXED,
943                kind UNINDEXED,
944                text_content,
945                tokenize = 'porter unicode61 remove_diacritics 2'
946            );
947
948            DROP TABLE IF EXISTS fts_node_properties;
949            CREATE VIRTUAL TABLE fts_node_properties USING fts5(
950                node_logical_id UNINDEXED,
951                kind UNINDEXED,
952                text_content,
953                tokenize = 'porter unicode61 remove_diacritics 2'
954            );
955
956            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
957            SELECT c.id, n.logical_id, n.kind, c.text_content
958            FROM chunks c
959            JOIN nodes n
960              ON n.logical_id = c.node_logical_id
961             AND n.superseded_at IS NULL;
962            ",
963        )?;
964        Ok(())
965    }
966
967    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
968        conn.execute_batch(
969            r"
970            CREATE TABLE IF NOT EXISTS fts_property_schemas (
971                kind TEXT PRIMARY KEY,
972                property_paths_json TEXT NOT NULL,
973                separator TEXT NOT NULL DEFAULT ' ',
974                format_version INTEGER NOT NULL DEFAULT 1,
975                created_at INTEGER NOT NULL DEFAULT (unixepoch())
976            );
977
978            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
979                node_logical_id UNINDEXED,
980                kind UNINDEXED,
981                text_content
982            );
983            ",
984        )?;
985        Ok(())
986    }
987
988    fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
989        // Collect all registered kinds
990        let kinds: Vec<String> = {
991            let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
992            stmt.query_map([], |r| r.get::<_, String>(0))?
993                .collect::<Result<Vec<_>, _>>()?
994        };
995
996        for kind in &kinds {
997            let table_name = fts_kind_table_name(kind);
998            let ddl = format!(
999                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1000                    node_logical_id UNINDEXED, \
1001                    text_content, \
1002                    tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1003                )"
1004            );
1005            conn.execute_batch(&ddl)?;
1006
1007            // Enqueue a PENDING rebuild for this kind (idempotent)
1008            conn.execute(
1009                "INSERT OR IGNORE INTO fts_property_rebuild_state \
1010                 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1011                 SELECT ?1, rowid, 'PENDING', 0, unixepoch('now') * 1000, 0 \
1012                 FROM fts_property_schemas WHERE kind = ?1",
1013                rusqlite::params![kind],
1014            )?;
1015        }
1016
1017        // Drop the old global table (deferred: write sites must be updated in A-6 first)
1018        // Note: actual DROP happens in migration 23, after A-6 redirects write sites
1019        Ok(())
1020    }
1021
1022    fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1023        // Idempotent: check if the column already exists before altering
1024        let column_exists: bool = {
1025            let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1026            let names: Vec<String> = stmt
1027                .query_map([], |r| r.get::<_, String>(1))?
1028                .collect::<Result<Vec<_>, _>>()?;
1029            names.iter().any(|n| n == "columns_json")
1030        };
1031
1032        if !column_exists {
1033            conn.execute_batch(
1034                "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1035            )?;
1036        }
1037
1038        Ok(())
1039    }
1040
1041    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1042        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1043        let names = stmt
1044            .query_map([], |row| row.get::<_, String>(1))?
1045            .collect::<Result<Vec<_>, _>>()?;
1046        Ok(names)
1047    }
1048
1049    #[must_use]
1050    pub fn current_version(&self) -> SchemaVersion {
1051        self.migrations()
1052            .last()
1053            .map_or(SchemaVersion(0), |migration| migration.version)
1054    }
1055
1056    #[must_use]
1057    pub fn migrations(&self) -> &'static [Migration] {
1058        MIGRATIONS
1059    }
1060
1061    /// Set the recommended `SQLite` connection pragmas for fathomdb.
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1066    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1067        conn.execute_batch(
1068            r"
1069            PRAGMA foreign_keys = ON;
1070            PRAGMA journal_mode = WAL;
1071            PRAGMA synchronous = NORMAL;
1072            PRAGMA busy_timeout = 5000;
1073            PRAGMA temp_store = MEMORY;
1074            PRAGMA mmap_size = 3000000000;
1075            PRAGMA journal_size_limit = 536870912;
1076            ",
1077        )?;
1078        Ok(())
1079    }
1080
1081    /// Initialize a **read-only** connection with PRAGMAs that are safe for
1082    /// readers.
1083    ///
1084    /// Skips `journal_mode` (requires write; the writer already set WAL),
1085    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
1086    /// (requires write).
1087    ///
1088    /// # Errors
1089    ///
1090    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1091    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1092        conn.execute_batch(
1093            r"
1094            PRAGMA foreign_keys = ON;
1095            PRAGMA busy_timeout = 5000;
1096            PRAGMA temp_store = MEMORY;
1097            PRAGMA mmap_size = 3000000000;
1098            ",
1099        )?;
1100        Ok(())
1101    }
1102
1103    /// Ensure the sqlite-vec vector extension profile is registered and the
1104    /// virtual vec table exists.
1105    ///
1106    /// When the `sqlite-vec` feature is enabled this creates the virtual table
1107    /// and records the profile in `vector_profiles` (with `enabled = 1`).
1108    /// When the feature is absent the call always returns
1109    /// [`SchemaError::MissingCapability`].
1110    ///
1111    /// # Errors
1112    ///
1113    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1114    #[cfg(feature = "sqlite-vec")]
1115    pub fn ensure_vector_profile(
1116        &self,
1117        conn: &Connection,
1118        profile: &str,
1119        table_name: &str,
1120        dimension: usize,
1121    ) -> Result<(), SchemaError> {
1122        conn.execute_batch(&format!(
1123            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1124                chunk_id TEXT PRIMARY KEY,\
1125                embedding float[{dimension}]\
1126            )"
1127        ))?;
1128        // Vector dimensions are small positive integers (typically <= a few
1129        // thousand); convert explicitly so clippy's cast_possible_wrap is happy.
1130        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1131            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1132                format!("vector dimension {dimension} does not fit in i64").into(),
1133            ))
1134        })?;
1135        conn.execute(
1136            "INSERT OR REPLACE INTO vector_profiles \
1137             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1138            rusqlite::params![profile, table_name, dimension_i64],
1139        )?;
1140        Ok(())
1141    }
1142
1143    /// # Errors
1144    ///
1145    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1146    /// feature is not compiled in.
1147    #[cfg(not(feature = "sqlite-vec"))]
1148    pub fn ensure_vector_profile(
1149        &self,
1150        _conn: &Connection,
1151        _profile: &str,
1152        _table_name: &str,
1153        _dimension: usize,
1154    ) -> Result<(), SchemaError> {
1155        Err(SchemaError::MissingCapability("sqlite-vec"))
1156    }
1157
1158    /// Create the internal migration-tracking table if it does not exist.
1159    ///
1160    /// # Errors
1161    ///
1162    /// Returns [`SchemaError`] if the DDL fails to execute.
1163    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1164        conn.execute_batch(
1165            r"
1166            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1167                version INTEGER PRIMARY KEY,
1168                description TEXT NOT NULL,
1169                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1170            );
1171            ",
1172        )?;
1173        Ok(())
1174    }
1175}
1176
1177/// Default FTS5 tokenizer used when no per-kind profile is configured.
1178pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1179
1180/// Derive the canonical FTS5 virtual-table name for a given node `kind`.
1181///
1182/// Rules:
1183/// 1. Lowercase the kind string.
1184/// 2. Replace every character that is NOT `[a-z0-9]` with `_`.
1185/// 3. Collapse consecutive underscores to a single `_`.
1186/// 4. Prefix with `fts_props_`.
1187/// 5. If the result exceeds 63 characters: truncate the slug to 55 characters
1188///    and append `_` + the first 7 hex chars of the SHA-256 of the original kind.
1189#[must_use]
1190pub fn fts_kind_table_name(kind: &str) -> String {
1191    // Step 1-3: normalise the slug
1192    let lowered = kind.to_lowercase();
1193    let mut slug = String::with_capacity(lowered.len());
1194    let mut prev_was_underscore = false;
1195    for ch in lowered.chars() {
1196        if ch.is_ascii_alphanumeric() {
1197            slug.push(ch);
1198            prev_was_underscore = false;
1199        } else {
1200            if !prev_was_underscore {
1201                slug.push('_');
1202            }
1203            prev_was_underscore = true;
1204        }
1205    }
1206
1207    // Step 4: prefix
1208    let prefixed = format!("fts_props_{slug}");
1209
1210    // Step 5: truncate if needed
1211    if prefixed.len() <= 63 {
1212        prefixed
1213    } else {
1214        // Hash the original kind string
1215        let hash = Sha256::digest(kind.as_bytes());
1216        let mut hex = String::with_capacity(hash.len() * 2);
1217        for b in &hash {
1218            use std::fmt::Write as _;
1219            let _ = write!(hex, "{b:02x}");
1220        }
1221        let hex_suffix = &hex[..7];
1222        // Slug must be 55 chars so that "fts_props_" (10) + slug (55) + "_" (1) + hex7 (7) = 73 — too long.
1223        // Wait: total = 10 + slug_len + 1 + 7 = 63  => slug_len = 45
1224        let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1225        format!("fts_props_{slug_truncated}_{hex_suffix}")
1226    }
1227}
1228
1229/// Derive the canonical FTS5 column name for a JSON path.
1230///
1231/// Rules:
1232/// 1. Strip leading `$.` or `$` prefix.
1233/// 2. Replace every character that is NOT `[a-z0-9_]` (after lowercasing) with `_`.
1234/// 3. Collapse consecutive underscores.
1235/// 4. If `is_recursive` is `true`, append `_all`.
1236#[must_use]
1237pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1238    // Step 1: strip prefix
1239    let stripped = if let Some(rest) = path.strip_prefix("$.") {
1240        rest
1241    } else if let Some(rest) = path.strip_prefix('$') {
1242        rest
1243    } else {
1244        path
1245    };
1246
1247    // Step 2-3: normalise
1248    let lowered = stripped.to_lowercase();
1249    let mut col = String::with_capacity(lowered.len());
1250    let mut prev_was_underscore = false;
1251    for ch in lowered.chars() {
1252        if ch.is_ascii_alphanumeric() || ch == '_' {
1253            col.push(ch);
1254            prev_was_underscore = ch == '_';
1255        } else {
1256            if !prev_was_underscore {
1257                col.push('_');
1258            }
1259            prev_was_underscore = true;
1260        }
1261    }
1262
1263    // Strip trailing underscores
1264    let col = col.trim_end_matches('_').to_owned();
1265
1266    // Step 4: recursive suffix
1267    if is_recursive {
1268        format!("{col}_all")
1269    } else {
1270        col
1271    }
1272}
1273
1274/// Look up the FTS tokenizer configured for a given `kind` in `projection_profiles`.
1275///
1276/// Returns [`DEFAULT_FTS_TOKENIZER`] when:
1277/// - the `projection_profiles` table does not exist yet, or
1278/// - no row exists for `(kind, 'fts')`, or
1279/// - the `tokenizer` field in `config_json` is absent or empty.
1280///
1281/// # Errors
1282///
1283/// Returns [`SchemaError::Sqlite`] on any `SQLite` error other than a missing table.
1284pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1285    let result = conn
1286        .query_row(
1287            "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1288            [kind],
1289            |row| row.get::<_, Option<String>>(0),
1290        )
1291        .optional();
1292
1293    match result {
1294        Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1295        Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1296        Err(rusqlite::Error::SqliteFailure(_, _)) => {
1297            // Table doesn't exist or other sqlite-level error — return default
1298            Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1299        }
1300        Err(e) => Err(SchemaError::Sqlite(e)),
1301    }
1302}
1303
1304#[cfg(test)]
1305#[allow(clippy::expect_used)]
1306mod tests {
1307    use rusqlite::Connection;
1308
1309    use super::SchemaManager;
1310
1311    #[test]
1312    fn bootstrap_applies_initial_schema() {
1313        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1314        let manager = SchemaManager::new();
1315
1316        let report = manager.bootstrap(&conn).expect("bootstrap report");
1317
1318        assert_eq!(
1319            report.applied_versions.len(),
1320            manager.current_version().0 as usize
1321        );
1322        assert!(report.sqlite_version.starts_with('3'));
1323        let table_count: i64 = conn
1324            .query_row(
1325                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1326                [],
1327                |row| row.get(0),
1328            )
1329            .expect("nodes table exists");
1330        assert_eq!(table_count, 1);
1331    }
1332
1333    // --- Item 2: vector profile tests ---
1334
1335    #[test]
1336    fn vector_profile_not_enabled_without_feature() {
1337        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1338        let manager = SchemaManager::new();
1339        let report = manager.bootstrap(&conn).expect("bootstrap");
1340        assert!(
1341            !report.vector_profile_enabled,
1342            "vector_profile_enabled must be false on a fresh bootstrap"
1343        );
1344    }
1345
1346    #[test]
1347    fn vector_profile_skipped_when_dimension_absent() {
1348        // ensure_vector_profile is never called → enabled stays false
1349        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1350        let manager = SchemaManager::new();
1351        manager.bootstrap(&conn).expect("bootstrap");
1352
1353        let count: i64 = conn
1354            .query_row(
1355                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1356                [],
1357                |row| row.get(0),
1358            )
1359            .expect("count");
1360        assert_eq!(
1361            count, 0,
1362            "no enabled profile without calling ensure_vector_profile"
1363        );
1364    }
1365
1366    #[test]
1367    fn bootstrap_report_reflects_actual_vector_state() {
1368        // After a fresh bootstrap with no vector profile, the report reflects reality.
1369        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1370        let manager = SchemaManager::new();
1371        let report = manager.bootstrap(&conn).expect("bootstrap");
1372
1373        let db_count: i64 = conn
1374            .query_row(
1375                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1376                [],
1377                |row| row.get(0),
1378            )
1379            .expect("count");
1380        assert_eq!(
1381            report.vector_profile_enabled,
1382            db_count > 0,
1383            "BootstrapReport.vector_profile_enabled must match actual DB state"
1384        );
1385    }
1386
1387    #[test]
1388    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1389        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1390        conn.execute_batch(
1391            r#"
1392            CREATE TABLE provenance_events (
1393                id         TEXT PRIMARY KEY,
1394                event_type TEXT NOT NULL,
1395                subject    TEXT NOT NULL,
1396                source_ref TEXT,
1397                created_at INTEGER NOT NULL DEFAULT (unixepoch())
1398            );
1399            CREATE TABLE vector_embedding_contracts (
1400                profile TEXT PRIMARY KEY,
1401                table_name TEXT NOT NULL,
1402                model_identity TEXT NOT NULL,
1403                model_version TEXT NOT NULL,
1404                dimension INTEGER NOT NULL,
1405                normalization_policy TEXT NOT NULL,
1406                chunking_policy TEXT NOT NULL,
1407                preprocessing_policy TEXT NOT NULL,
1408                generator_command_json TEXT NOT NULL,
1409                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1410                applied_at INTEGER NOT NULL DEFAULT 0,
1411                snapshot_hash TEXT NOT NULL DEFAULT ''
1412            );
1413            INSERT INTO vector_embedding_contracts (
1414                profile,
1415                table_name,
1416                model_identity,
1417                model_version,
1418                dimension,
1419                normalization_policy,
1420                chunking_policy,
1421                preprocessing_policy,
1422                generator_command_json,
1423                updated_at,
1424                applied_at,
1425                snapshot_hash
1426            ) VALUES (
1427                'default',
1428                'vec_nodes_active',
1429                'legacy-model',
1430                '0.9.0',
1431                4,
1432                'l2',
1433                'per_chunk',
1434                'trim',
1435                '["/bin/echo"]',
1436                100,
1437                100,
1438                'legacy'
1439            );
1440            "#,
1441        )
1442        .expect("seed legacy schema");
1443        let manager = SchemaManager::new();
1444
1445        let report = manager.bootstrap(&conn).expect("bootstrap");
1446
1447        assert!(
1448            report.applied_versions.iter().any(|version| version.0 >= 5),
1449            "bootstrap should apply hardening migrations"
1450        );
1451        let format_version: i64 = conn
1452            .query_row(
1453                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1454                [],
1455                |row| row.get(0),
1456            )
1457            .expect("contract_format_version");
1458        assert_eq!(format_version, 1);
1459        let metadata_column_count: i64 = conn
1460            .query_row(
1461                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1462                [],
1463                |row| row.get(0),
1464            )
1465            .expect("metadata_json column count");
1466        assert_eq!(metadata_column_count, 1);
1467    }
1468
1469    #[test]
1470    fn bootstrap_creates_operational_store_tables() {
1471        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1472        let manager = SchemaManager::new();
1473
1474        manager.bootstrap(&conn).expect("bootstrap");
1475
1476        for table in [
1477            "operational_collections",
1478            "operational_mutations",
1479            "operational_current",
1480        ] {
1481            let count: i64 = conn
1482                .query_row(
1483                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1484                    [table],
1485                    |row| row.get(0),
1486                )
1487                .expect("table existence");
1488            assert_eq!(count, 1, "{table} should exist after bootstrap");
1489        }
1490        let mutation_order_columns: i64 = conn
1491            .query_row(
1492                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1493                [],
1494                |row| row.get(0),
1495            )
1496            .expect("mutation_order column exists");
1497        assert_eq!(mutation_order_columns, 1);
1498    }
1499
1500    #[test]
1501    fn bootstrap_is_idempotent_with_operational_store_tables() {
1502        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1503        let manager = SchemaManager::new();
1504
1505        manager.bootstrap(&conn).expect("first bootstrap");
1506        let report = manager.bootstrap(&conn).expect("second bootstrap");
1507
1508        assert!(
1509            report.applied_versions.is_empty(),
1510            "second bootstrap should apply no new migrations"
1511        );
1512        let count: i64 = conn
1513            .query_row(
1514                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1515                [],
1516                |row| row.get(0),
1517        )
1518        .expect("operational_collections table exists");
1519        assert_eq!(count, 1);
1520    }
1521
1522    #[test]
1523    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1524        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1525        conn.execute_batch(
1526            r#"
1527            CREATE TABLE operational_collections (
1528                name TEXT PRIMARY KEY,
1529                kind TEXT NOT NULL,
1530                schema_json TEXT NOT NULL,
1531                retention_json TEXT NOT NULL,
1532                format_version INTEGER NOT NULL DEFAULT 1,
1533                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1534                disabled_at INTEGER
1535            );
1536
1537            CREATE TABLE operational_mutations (
1538                id TEXT PRIMARY KEY,
1539                collection_name TEXT NOT NULL,
1540                record_key TEXT NOT NULL,
1541                op_kind TEXT NOT NULL,
1542                payload_json TEXT NOT NULL,
1543                source_ref TEXT,
1544                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1545                mutation_order INTEGER NOT NULL DEFAULT 0,
1546                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1547            );
1548
1549            CREATE TABLE operational_current (
1550                collection_name TEXT NOT NULL,
1551                record_key TEXT NOT NULL,
1552                payload_json TEXT NOT NULL,
1553                updated_at INTEGER NOT NULL,
1554                last_mutation_id TEXT NOT NULL,
1555                PRIMARY KEY(collection_name, record_key),
1556                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1557                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1558            );
1559
1560            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1561            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1562            INSERT INTO operational_mutations (
1563                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1564            ) VALUES (
1565                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1566            );
1567            "#,
1568        )
1569        .expect("seed recovered operational tables");
1570
1571        let manager = SchemaManager::new();
1572        let report = manager
1573            .bootstrap(&conn)
1574            .expect("bootstrap recovered schema");
1575
1576        assert!(
1577            report.applied_versions.iter().any(|version| version.0 == 8),
1578            "bootstrap should record operational mutation ordering hardening"
1579        );
1580        let mutation_order: i64 = conn
1581            .query_row(
1582                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1583                [],
1584                |row| row.get(0),
1585            )
1586            .expect("mutation_order");
1587        assert_ne!(
1588            mutation_order, 0,
1589            "bootstrap should backfill recovered operational rows"
1590        );
1591        let count: i64 = conn
1592            .query_row(
1593                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1594                [],
1595                |row| row.get(0),
1596            )
1597            .expect("ordering index exists");
1598        assert_eq!(count, 1);
1599    }
1600
1601    #[test]
1602    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1603        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1604        conn.execute_batch(
1605            r#"
1606            CREATE TABLE operational_collections (
1607                name TEXT PRIMARY KEY,
1608                kind TEXT NOT NULL,
1609                schema_json TEXT NOT NULL,
1610                retention_json TEXT NOT NULL,
1611                format_version INTEGER NOT NULL DEFAULT 1,
1612                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1613                disabled_at INTEGER
1614            );
1615
1616            CREATE TABLE operational_mutations (
1617                id TEXT PRIMARY KEY,
1618                collection_name TEXT NOT NULL,
1619                record_key TEXT NOT NULL,
1620                op_kind TEXT NOT NULL,
1621                payload_json TEXT NOT NULL,
1622                source_ref TEXT,
1623                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1624                mutation_order INTEGER NOT NULL DEFAULT 1,
1625                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1626            );
1627
1628            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1629            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1630            "#,
1631        )
1632        .expect("seed recovered operational schema");
1633
1634        let manager = SchemaManager::new();
1635        let report = manager
1636            .bootstrap(&conn)
1637            .expect("bootstrap recovered schema");
1638
1639        assert!(
1640            report
1641                .applied_versions
1642                .iter()
1643                .any(|version| version.0 == 10),
1644            "bootstrap should record operational filtered read migration"
1645        );
1646        assert!(
1647            report
1648                .applied_versions
1649                .iter()
1650                .any(|version| version.0 == 11),
1651            "bootstrap should record operational validation migration"
1652        );
1653        let filter_fields_json: String = conn
1654            .query_row(
1655                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1656                [],
1657                |row| row.get(0),
1658            )
1659            .expect("filter_fields_json added");
1660        assert_eq!(filter_fields_json, "[]");
1661        let validation_json: String = conn
1662            .query_row(
1663                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1664                [],
1665                |row| row.get(0),
1666            )
1667            .expect("validation_json added");
1668        assert_eq!(validation_json, "");
1669        let table_count: i64 = conn
1670            .query_row(
1671                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1672                [],
1673                |row| row.get(0),
1674        )
1675        .expect("filter table exists");
1676        assert_eq!(table_count, 1);
1677    }
1678
1679    #[test]
1680    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1681        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1682        let manager = SchemaManager::new();
1683        manager.bootstrap(&conn).expect("initial bootstrap");
1684
1685        conn.execute("DROP TABLE fathom_schema_migrations", [])
1686            .expect("drop migration history");
1687        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1688
1689        let report = manager
1690            .bootstrap(&conn)
1691            .expect("rebootstrap existing schema");
1692
1693        assert!(
1694            report
1695                .applied_versions
1696                .iter()
1697                .any(|version| version.0 == 10),
1698            "rebootstrap should re-record migration 10"
1699        );
1700        assert!(
1701            report
1702                .applied_versions
1703                .iter()
1704                .any(|version| version.0 == 11),
1705            "rebootstrap should re-record migration 11"
1706        );
1707        let filter_fields_json: String = conn
1708            .query_row(
1709                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1710                [],
1711                |row| row.get(0),
1712            )
1713            .unwrap_or_else(|_| "[]".to_string());
1714        assert_eq!(filter_fields_json, "[]");
1715        let validation_json: String = conn
1716            .query_row(
1717                "SELECT validation_json FROM operational_collections LIMIT 1",
1718                [],
1719                |row| row.get(0),
1720            )
1721            .unwrap_or_default();
1722        assert_eq!(validation_json, "");
1723    }
1724
1725    #[test]
1726    fn downgrade_detected_returns_version_mismatch() {
1727        use crate::SchemaError;
1728
1729        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1730        let manager = SchemaManager::new();
1731        manager.bootstrap(&conn).expect("initial bootstrap");
1732
1733        conn.execute(
1734            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1735            (999_i64, "future migration"),
1736        )
1737        .expect("insert future version");
1738
1739        let err = manager
1740            .bootstrap(&conn)
1741            .expect_err("should fail on downgrade");
1742        assert!(
1743            matches!(
1744                err,
1745                SchemaError::VersionMismatch {
1746                    database_version: 999,
1747                    ..
1748                }
1749            ),
1750            "expected VersionMismatch with database_version 999, got: {err}"
1751        );
1752    }
1753
1754    #[test]
1755    fn journal_size_limit_is_set() {
1756        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1757        let manager = SchemaManager::new();
1758        manager
1759            .initialize_connection(&conn)
1760            .expect("initialize_connection");
1761
1762        let limit: i64 = conn
1763            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1764            .expect("journal_size_limit pragma");
1765        assert_eq!(limit, 536_870_912);
1766    }
1767
1768    #[cfg(feature = "sqlite-vec")]
1769    #[test]
1770    fn vector_profile_created_when_feature_enabled() {
1771        // Register the sqlite-vec extension globally so the in-memory
1772        // connection can use the vec0 module.
1773        unsafe {
1774            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1775                sqlite_vec::sqlite3_vec_init as *const (),
1776            )));
1777        }
1778        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1779        let manager = SchemaManager::new();
1780        manager.bootstrap(&conn).expect("bootstrap");
1781
1782        manager
1783            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1784            .expect("ensure_vector_profile");
1785
1786        let count: i64 = conn
1787            .query_row(
1788                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1789                [],
1790                |row| row.get(0),
1791            )
1792            .expect("count");
1793        assert_eq!(
1794            count, 1,
1795            "vector profile must be enabled after ensure_vector_profile"
1796        );
1797
1798        // Verify the virtual table exists by querying it
1799        let _: i64 = conn
1800            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1801                row.get(0)
1802            })
1803            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1804    }
1805
1806    // --- A-1: fts_kind_table_name tests ---
1807
1808    #[test]
1809    fn fts_kind_table_name_simple_kind() {
1810        assert_eq!(
1811            super::fts_kind_table_name("WMKnowledgeObject"),
1812            "fts_props_wmknowledgeobject"
1813        );
1814    }
1815
1816    #[test]
1817    fn fts_kind_table_name_another_simple_kind() {
1818        assert_eq!(
1819            super::fts_kind_table_name("WMExecutionRecord"),
1820            "fts_props_wmexecutionrecord"
1821        );
1822    }
1823
1824    #[test]
1825    fn fts_kind_table_name_with_separator_chars() {
1826        assert_eq!(
1827            super::fts_kind_table_name("MyKind-With.Dots"),
1828            "fts_props_mykind_with_dots"
1829        );
1830    }
1831
1832    #[test]
1833    fn fts_kind_table_name_collapses_consecutive_underscores() {
1834        assert_eq!(
1835            super::fts_kind_table_name("Kind__Double__Underscores"),
1836            "fts_props_kind_double_underscores"
1837        );
1838    }
1839
1840    #[test]
1841    fn fts_kind_table_name_long_kind_truncates_with_hash() {
1842        // 61 A's — slug after prefix would be 61 chars, exceeding 63-char limit
1843        let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
1844        let result = super::fts_kind_table_name(long_kind);
1845        assert_eq!(result.len(), 63, "result must be exactly 63 chars");
1846        assert!(
1847            result.starts_with("fts_props_"),
1848            "result must start with fts_props_"
1849        );
1850        // Must contain underscore before 7-char hex suffix
1851        let last_underscore = result.rfind('_').expect("must contain underscore");
1852        let hex_suffix = &result[last_underscore + 1..];
1853        assert_eq!(hex_suffix.len(), 7, "hex suffix must be 7 chars");
1854        assert!(
1855            hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
1856            "hex suffix must be hex digits"
1857        );
1858    }
1859
1860    #[test]
1861    fn fts_kind_table_name_testkind() {
1862        assert_eq!(super::fts_kind_table_name("TestKind"), "fts_props_testkind");
1863    }
1864
1865    // --- A-1: fts_column_name tests ---
1866
1867    #[test]
1868    fn fts_column_name_simple_field() {
1869        assert_eq!(super::fts_column_name("$.title", false), "title");
1870    }
1871
1872    #[test]
1873    fn fts_column_name_nested_path() {
1874        assert_eq!(
1875            super::fts_column_name("$.payload.content", false),
1876            "payload_content"
1877        );
1878    }
1879
1880    #[test]
1881    fn fts_column_name_recursive() {
1882        assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
1883    }
1884
1885    #[test]
1886    fn fts_column_name_special_chars() {
1887        assert_eq!(
1888            super::fts_column_name("$.some-field[0]", false),
1889            "some_field_0"
1890        );
1891    }
1892
1893    // --- A-1: resolve_fts_tokenizer tests ---
1894
1895    #[test]
1896    fn resolve_fts_tokenizer_returns_default_when_no_table() {
1897        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1898        // No projection_profiles table — should return default
1899        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1900        assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
1901    }
1902
1903    #[test]
1904    fn resolve_fts_tokenizer_returns_configured_value() {
1905        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1906        conn.execute_batch(
1907            "CREATE TABLE projection_profiles (
1908                kind TEXT NOT NULL,
1909                facet TEXT NOT NULL,
1910                config_json TEXT NOT NULL,
1911                active_at INTEGER,
1912                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1913                PRIMARY KEY (kind, facet)
1914            );
1915            INSERT INTO projection_profiles (kind, facet, config_json)
1916            VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1917        )
1918        .expect("setup table");
1919
1920        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1921        assert_eq!(result, "trigram");
1922
1923        let default_result =
1924            super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
1925        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
1926    }
1927
1928    // --- A-2: migration 20 tests ---
1929
1930    #[test]
1931    fn migration_20_creates_projection_profiles_table() {
1932        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1933        let manager = SchemaManager::new();
1934        manager.bootstrap(&conn).expect("bootstrap");
1935
1936        let table_exists: i64 = conn
1937            .query_row(
1938                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
1939                [],
1940                |row| row.get(0),
1941            )
1942            .expect("query sqlite_master");
1943        assert_eq!(table_exists, 1, "projection_profiles table must exist");
1944
1945        // Check columns exist by querying them
1946        conn.execute_batch(
1947            "INSERT INTO projection_profiles (kind, facet, config_json)
1948             VALUES ('TestKind', 'fts', '{}')",
1949        )
1950        .expect("insert row to verify columns");
1951        let (kind, facet, config_json): (String, String, String) = conn
1952            .query_row(
1953                "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
1954                [],
1955                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1956            )
1957            .expect("select columns");
1958        assert_eq!(kind, "TestKind");
1959        assert_eq!(facet, "fts");
1960        assert_eq!(config_json, "{}");
1961    }
1962
1963    #[test]
1964    fn migration_20_primary_key_is_kind_facet() {
1965        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1966        let manager = SchemaManager::new();
1967        manager.bootstrap(&conn).expect("bootstrap");
1968
1969        conn.execute_batch(
1970            "INSERT INTO projection_profiles (kind, facet, config_json)
1971             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
1972        )
1973        .expect("first insert");
1974
1975        // Second insert with same (kind, facet) must fail
1976        let result = conn.execute_batch(
1977            "INSERT INTO projection_profiles (kind, facet, config_json)
1978             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1979        );
1980        assert!(
1981            result.is_err(),
1982            "duplicate (kind, facet) must violate PRIMARY KEY"
1983        );
1984    }
1985
1986    #[test]
1987    fn migration_20_resolve_fts_tokenizer_end_to_end() {
1988        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1989        let manager = SchemaManager::new();
1990        manager.bootstrap(&conn).expect("bootstrap");
1991
1992        conn.execute_batch(
1993            "INSERT INTO projection_profiles (kind, facet, config_json)
1994             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
1995        )
1996        .expect("insert profile");
1997
1998        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
1999        assert_eq!(result, "trigram");
2000
2001        let default_result =
2002            super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2003        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2004    }
2005
2006    #[test]
2007    fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2008        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2009        let manager = SchemaManager::new();
2010
2011        // Bootstrap up through migration 20 by using a fresh DB, then manually insert
2012        // a kind into fts_property_schemas so migration 21 picks it up.
2013        // We do a full bootstrap (which applies all migrations including 21).
2014        // Insert the kind before bootstrapping so it is present when migration 21 runs.
2015        // Since bootstrap applies migrations in order and migration 15 creates
2016        // fts_property_schemas, we must insert after that table exists.
2017        // Strategy: bootstrap first, insert kind, then run bootstrap again (idempotent).
2018        manager.bootstrap(&conn).expect("first bootstrap");
2019
2020        conn.execute_batch(
2021            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2022             VALUES ('TestKind', '[]', ',', 1)",
2023        )
2024        .expect("insert kind");
2025
2026        // Now run ensure_per_kind_fts_tables directly by calling bootstrap again — migration 21
2027        // is already applied, so we test the function directly.
2028        SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2029
2030        // fts_props_testkind should exist
2031        let count: i64 = conn
2032            .query_row(
2033                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='fts_props_testkind'",
2034                [],
2035                |r| r.get(0),
2036            )
2037            .expect("count fts table");
2038        assert_eq!(
2039            count, 1,
2040            "fts_props_testkind virtual table should be created"
2041        );
2042
2043        // PENDING row should exist
2044        let state: String = conn
2045            .query_row(
2046                "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2047                [],
2048                |r| r.get(0),
2049            )
2050            .expect("rebuild state row");
2051        assert_eq!(state, "PENDING");
2052    }
2053
2054    #[test]
2055    fn migration_22_adds_columns_json_to_staging_table() {
2056        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2057        let manager = SchemaManager::new();
2058        manager.bootstrap(&conn).expect("bootstrap");
2059
2060        let col_count: i64 = conn
2061            .query_row(
2062                "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2063                [],
2064                |r| r.get(0),
2065            )
2066            .expect("pragma_table_info");
2067        assert_eq!(
2068            col_count, 1,
2069            "columns_json column must exist after migration 22"
2070        );
2071    }
2072
2073    // --- A-6: migration 23 drops fts_node_properties ---
2074    #[test]
2075    fn migration_23_drops_global_fts_node_properties_table() {
2076        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2077        let manager = SchemaManager::new();
2078        manager.bootstrap(&conn).expect("bootstrap");
2079
2080        // After migration 23, fts_node_properties must NOT exist in sqlite_master.
2081        let count: i64 = conn
2082            .query_row(
2083                "SELECT count(*) FROM sqlite_master \
2084                 WHERE type='table' AND name='fts_node_properties'",
2085                [],
2086                |r| r.get(0),
2087            )
2088            .expect("check sqlite_master");
2089        assert_eq!(
2090            count, 0,
2091            "fts_node_properties must be dropped by migration 23"
2092        );
2093    }
2094}