Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2use sha2::{Digest, Sha256};
3
4use crate::{Migration, SchemaError, SchemaVersion};
5
6static MIGRATIONS: &[Migration] = &[
7    Migration::new(
8        SchemaVersion(1),
9        "initial canonical schema and runtime tables",
10        r"
11                CREATE TABLE IF NOT EXISTS nodes (
12                    row_id TEXT PRIMARY KEY,
13                    logical_id TEXT NOT NULL,
14                    kind TEXT NOT NULL,
15                    properties BLOB NOT NULL,
16                    created_at INTEGER NOT NULL,
17                    superseded_at INTEGER,
18                    source_ref TEXT,
19                    confidence REAL
20                );
21
22                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
23                    ON nodes(logical_id)
24                    WHERE superseded_at IS NULL;
25                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
26                    ON nodes(kind, superseded_at);
27                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
28                    ON nodes(source_ref);
29
30                CREATE TABLE IF NOT EXISTS edges (
31                    row_id TEXT PRIMARY KEY,
32                    logical_id TEXT NOT NULL,
33                    source_logical_id TEXT NOT NULL,
34                    target_logical_id TEXT NOT NULL,
35                    kind TEXT NOT NULL,
36                    properties BLOB NOT NULL,
37                    created_at INTEGER NOT NULL,
38                    superseded_at INTEGER,
39                    source_ref TEXT,
40                    confidence REAL
41                );
42
43                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
44                    ON edges(logical_id)
45                    WHERE superseded_at IS NULL;
46                CREATE INDEX IF NOT EXISTS idx_edges_source_active
47                    ON edges(source_logical_id, kind, superseded_at);
48                CREATE INDEX IF NOT EXISTS idx_edges_target_active
49                    ON edges(target_logical_id, kind, superseded_at);
50                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
51                    ON edges(source_ref);
52
53                CREATE TABLE IF NOT EXISTS chunks (
54                    id TEXT PRIMARY KEY,
55                    node_logical_id TEXT NOT NULL,
56                    text_content TEXT NOT NULL,
57                    byte_start INTEGER,
58                    byte_end INTEGER,
59                    created_at INTEGER NOT NULL
60                );
61
62                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
63                    ON chunks(node_logical_id);
64
65                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
66                    chunk_id UNINDEXED,
67                    node_logical_id UNINDEXED,
68                    kind UNINDEXED,
69                    text_content
70                );
71
72                CREATE TABLE IF NOT EXISTS vector_profiles (
73                    profile TEXT PRIMARY KEY,
74                    table_name TEXT NOT NULL,
75                    dimension INTEGER NOT NULL,
76                    enabled INTEGER NOT NULL DEFAULT 0
77                );
78
79                CREATE TABLE IF NOT EXISTS runs (
80                    id TEXT PRIMARY KEY,
81                    kind TEXT NOT NULL,
82                    status TEXT NOT NULL,
83                    properties BLOB NOT NULL,
84                    created_at INTEGER NOT NULL,
85                    completed_at INTEGER,
86                    superseded_at INTEGER,
87                    source_ref TEXT
88                );
89
90                CREATE TABLE IF NOT EXISTS steps (
91                    id TEXT PRIMARY KEY,
92                    run_id TEXT NOT NULL,
93                    kind TEXT NOT NULL,
94                    status TEXT NOT NULL,
95                    properties BLOB NOT NULL,
96                    created_at INTEGER NOT NULL,
97                    completed_at INTEGER,
98                    superseded_at INTEGER,
99                    source_ref TEXT,
100                    FOREIGN KEY(run_id) REFERENCES runs(id)
101                );
102
103                CREATE TABLE IF NOT EXISTS actions (
104                    id TEXT PRIMARY KEY,
105                    step_id TEXT NOT NULL,
106                    kind TEXT NOT NULL,
107                    status TEXT NOT NULL,
108                    properties BLOB NOT NULL,
109                    created_at INTEGER NOT NULL,
110                    completed_at INTEGER,
111                    superseded_at INTEGER,
112                    source_ref TEXT,
113                    FOREIGN KEY(step_id) REFERENCES steps(id)
114                );
115
116                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
117                    ON runs(source_ref);
118                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
119                    ON steps(source_ref);
120                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
121                    ON actions(source_ref);
122                ",
123    ),
124    Migration::new(
125        SchemaVersion(2),
126        "durable audit trail: provenance_events table",
127        r"
128                CREATE TABLE IF NOT EXISTS provenance_events (
129                    id         TEXT PRIMARY KEY,
130                    event_type TEXT NOT NULL,
131                    subject    TEXT NOT NULL,
132                    source_ref TEXT,
133                    created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
134                );
135                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
136                    ON provenance_events (subject, event_type);
137                ",
138    ),
139    Migration::new(
140        SchemaVersion(3),
141        "vector regeneration contracts",
142        r"
143                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
144                    profile TEXT PRIMARY KEY,
145                    table_name TEXT NOT NULL,
146                    model_identity TEXT NOT NULL,
147                    model_version TEXT NOT NULL,
148                    dimension INTEGER NOT NULL,
149                    normalization_policy TEXT NOT NULL,
150                    chunking_policy TEXT NOT NULL,
151                    preprocessing_policy TEXT NOT NULL,
152                    generator_command_json TEXT NOT NULL,
153                    updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
154                );
155                ",
156    ),
157    Migration::new(
158        SchemaVersion(4),
159        "vector regeneration apply metadata",
160        r"
161                ALTER TABLE vector_embedding_contracts
162                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
163                ALTER TABLE vector_embedding_contracts
164                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
165                UPDATE vector_embedding_contracts
166                SET
167                    applied_at = CASE
168                        WHEN applied_at = 0 THEN updated_at
169                        ELSE applied_at
170                    END,
171                    snapshot_hash = CASE
172                        WHEN snapshot_hash = '' THEN 'legacy'
173                        ELSE snapshot_hash
174                    END;
175                ",
176    ),
177    Migration::new(
178        SchemaVersion(5),
179        "vector regeneration contract format version",
180        r"
181                ALTER TABLE vector_embedding_contracts
182                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
183                UPDATE vector_embedding_contracts
184                SET contract_format_version = 1
185                WHERE contract_format_version = 0;
186                ",
187    ),
188    Migration::new(
189        SchemaVersion(6),
190        "provenance metadata payloads",
191        r"
192                ALTER TABLE provenance_events
193                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
194                ",
195    ),
196    Migration::new(
197        SchemaVersion(7),
198        "operational store canonical and derived tables",
199        r"
200                CREATE TABLE IF NOT EXISTS operational_collections (
201                    name TEXT PRIMARY KEY,
202                    kind TEXT NOT NULL,
203                    schema_json TEXT NOT NULL,
204                    retention_json TEXT NOT NULL,
205                    format_version INTEGER NOT NULL DEFAULT 1,
206                    created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
207                    disabled_at INTEGER
208                );
209
210                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
211                    ON operational_collections(kind, disabled_at);
212
213                CREATE TABLE IF NOT EXISTS operational_mutations (
214                    id TEXT PRIMARY KEY,
215                    collection_name TEXT NOT NULL,
216                    record_key TEXT NOT NULL,
217                    op_kind TEXT NOT NULL,
218                    payload_json TEXT NOT NULL,
219                    source_ref TEXT,
220                    created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
221                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
222                );
223
224                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
225                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
226                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
227                    ON operational_mutations(source_ref);
228
229                CREATE TABLE IF NOT EXISTS operational_current (
230                    collection_name TEXT NOT NULL,
231                    record_key TEXT NOT NULL,
232                    payload_json TEXT NOT NULL,
233                    updated_at INTEGER NOT NULL,
234                    last_mutation_id TEXT NOT NULL,
235                    PRIMARY KEY(collection_name, record_key),
236                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
237                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
238                );
239
240                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
241                    ON operational_current(collection_name, updated_at DESC);
242                ",
243    ),
244    Migration::new(
245        SchemaVersion(8),
246        "operational mutation ordering hardening",
247        r"
248                ALTER TABLE operational_mutations
249                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
250                UPDATE operational_mutations
251                SET mutation_order = rowid
252                WHERE mutation_order = 0;
253                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
254                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
255                ",
256    ),
257    Migration::new(
258        SchemaVersion(9),
259        "node last_accessed metadata",
260        r"
261                CREATE TABLE IF NOT EXISTS node_access_metadata (
262                    logical_id TEXT PRIMARY KEY,
263                    last_accessed_at INTEGER NOT NULL,
264                    updated_at INTEGER NOT NULL
265                );
266
267                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
268                    ON node_access_metadata(last_accessed_at DESC);
269                ",
270    ),
271    Migration::new(
272        SchemaVersion(10),
273        "operational filtered read contracts and extracted values",
274        r"
275                ALTER TABLE operational_collections
276                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
277
278                CREATE TABLE IF NOT EXISTS operational_filter_values (
279                    mutation_id TEXT NOT NULL,
280                    collection_name TEXT NOT NULL,
281                    field_name TEXT NOT NULL,
282                    string_value TEXT,
283                    integer_value INTEGER,
284                    PRIMARY KEY(mutation_id, field_name),
285                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
286                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
287                );
288
289                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
290                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
291                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
292                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
293                ",
294    ),
295    Migration::new(
296        SchemaVersion(11),
297        "operational payload validation contracts",
298        r"
299                ALTER TABLE operational_collections
300                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
301                ",
302    ),
303    Migration::new(
304        SchemaVersion(12),
305        "operational secondary indexes",
306        r"
307                ALTER TABLE operational_collections
308                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
309
310                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
311                    collection_name TEXT NOT NULL,
312                    index_name TEXT NOT NULL,
313                    subject_kind TEXT NOT NULL,
314                    mutation_id TEXT NOT NULL DEFAULT '',
315                    record_key TEXT NOT NULL DEFAULT '',
316                    sort_timestamp INTEGER,
317                    slot1_text TEXT,
318                    slot1_integer INTEGER,
319                    slot2_text TEXT,
320                    slot2_integer INTEGER,
321                    slot3_text TEXT,
322                    slot3_integer INTEGER,
323                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
324                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
325                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
326                );
327
328                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
329                    ON operational_secondary_index_entries(
330                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
331                    );
332                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
333                    ON operational_secondary_index_entries(
334                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
335                    );
336                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
337                    ON operational_secondary_index_entries(
338                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
339                    );
340                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
341                    ON operational_secondary_index_entries(
342                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
343                );
344                ",
345    ),
346    Migration::new(
347        SchemaVersion(13),
348        "operational retention run metadata",
349        r"
350                CREATE TABLE IF NOT EXISTS operational_retention_runs (
351                    id TEXT PRIMARY KEY,
352                    collection_name TEXT NOT NULL,
353                    executed_at INTEGER NOT NULL,
354                    action_kind TEXT NOT NULL,
355                    dry_run INTEGER NOT NULL DEFAULT 0,
356                    deleted_mutations INTEGER NOT NULL,
357                    rows_remaining INTEGER NOT NULL,
358                    metadata_json TEXT NOT NULL DEFAULT '',
359                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
360                );
361
362                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
363                    ON operational_retention_runs(collection_name, executed_at DESC);
364                ",
365    ),
366    Migration::new(
367        SchemaVersion(14),
368        "external content object columns",
369        r"
370                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
371
372                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
373                    ON nodes(content_ref)
374                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
375
376                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
377                ",
378    ),
379    Migration::new(
380        SchemaVersion(15),
381        "FTS property projection schemas",
382        r"
383                CREATE TABLE IF NOT EXISTS fts_property_schemas (
384                    kind TEXT PRIMARY KEY,
385                    property_paths_json TEXT NOT NULL,
386                    separator TEXT NOT NULL DEFAULT ' ',
387                    format_version INTEGER NOT NULL DEFAULT 1,
388                    created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
389                );
390
391                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
392                    node_logical_id UNINDEXED,
393                    kind UNINDEXED,
394                    text_content
395                );
396                ",
397    ),
398    Migration::new(
399        SchemaVersion(16),
400        "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
401        // DDL applied by `ensure_unicode_porter_fts_tokenizers`; the hook
402        // drops, recreates, and rebuilds both tables from canonical state.
403        // The FTS5 chained tokenizer syntax requires the wrapper (porter)
404        // before the base tokenizer, so the applied `tokenize=` clause is
405        // `'porter unicode61 remove_diacritics 2'`.
406        "",
407    ),
408    Migration::new(
409        SchemaVersion(17),
410        "fts property position-map sidecar for recursive extraction",
411        // Sidecar position map for property FTS. The recursive extraction
412        // walk in the engine emits one row per scalar leaf contributing to a
413        // given node's property FTS blob, carrying the half-open byte range
414        // `[start_offset, end_offset)` within the blob and the JSON-path of
415        // the originating leaf. Phase 5 uses this to attribute tokens back
416        // to their source leaves.
417        //
418        // The existing `fts_property_schemas.property_paths_json` column is
419        // reused unchanged at the DDL level; the engine-side JSON decoder
420        // tolerates both the legacy shape (bare strings = scalar) and the
421        // new shape (objects with `mode` = `scalar`|`recursive`, optional
422        // `exclude_paths`). Backwards compatibility is guaranteed because a
423        // bare JSON array of strings still deserialises into scalar-mode
424        // entries.
425        r"
426                CREATE TABLE IF NOT EXISTS fts_node_property_positions (
427                    node_logical_id TEXT NOT NULL,
428                    kind TEXT NOT NULL,
429                    start_offset INTEGER NOT NULL,
430                    end_offset INTEGER NOT NULL,
431                    leaf_path TEXT NOT NULL
432                );
433
434                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
435                    ON fts_node_property_positions(node_logical_id, kind);
436
437                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
438                    ON fts_node_property_positions(kind);
439                ",
440    ),
441    Migration::new(
442        SchemaVersion(18),
443        "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
444        // P4-P2-4: the v17 sidecar DDL did not enforce uniqueness of the
445        // `(node_logical_id, kind, start_offset)` tuple, so a buggy rebuild
446        // path could silently double-insert a leaf and break attribution
447        // lookups. Drop and recreate the table with the UNIQUE constraint,
448        // preserving the existing indexes. The DDL leaves the table empty,
449        // which the open-time rebuild guard in `ExecutionCoordinator::open`
450        // detects (empty positions + recursive schemas present) and
451        // repopulates from canonical state on the next open. The rebuild
452        // path is idempotent and safe to run unconditionally.
453        //
454        // `fts_node_property_positions` is a regular SQLite table, not an
455        // FTS5 virtual table, so UNIQUE constraints are supported.
456        r"
457                DROP TABLE IF EXISTS fts_node_property_positions;
458
459                CREATE TABLE fts_node_property_positions (
460                    node_logical_id TEXT NOT NULL,
461                    kind TEXT NOT NULL,
462                    start_offset INTEGER NOT NULL,
463                    end_offset INTEGER NOT NULL,
464                    leaf_path TEXT NOT NULL,
465                    UNIQUE(node_logical_id, kind, start_offset)
466                );
467
468                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
469                    ON fts_node_property_positions(node_logical_id, kind);
470
471                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
472                    ON fts_node_property_positions(kind);
473                ",
474    ),
475    Migration::new(
476        SchemaVersion(19),
477        "async property-FTS rebuild staging and state tables",
478        r"
479                CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
480                    kind TEXT NOT NULL,
481                    node_logical_id TEXT NOT NULL,
482                    text_content TEXT NOT NULL,
483                    positions_blob BLOB,
484                    PRIMARY KEY (kind, node_logical_id)
485                );
486
487                CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
488                    kind TEXT PRIMARY KEY,
489                    schema_id INTEGER NOT NULL,
490                    state TEXT NOT NULL,
491                    rows_total INTEGER,
492                    rows_done INTEGER NOT NULL DEFAULT 0,
493                    started_at INTEGER NOT NULL,
494                    last_progress_at INTEGER,
495                    error_message TEXT,
496                    is_first_registration INTEGER NOT NULL DEFAULT 0
497                );
498                ",
499    ),
500    Migration::new(
501        SchemaVersion(20),
502        "projection_profiles table for per-kind FTS tokenizer configuration",
503        r"CREATE TABLE IF NOT EXISTS projection_profiles (
504            kind        TEXT    NOT NULL,
505            facet       TEXT    NOT NULL,
506            config_json TEXT    NOT NULL,
507            active_at   INTEGER,
508            created_at  INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
509            PRIMARY KEY (kind, facet)
510        );",
511    ),
512    Migration::new(
513        SchemaVersion(21),
514        "per-kind FTS5 tables replacing fts_node_properties",
515        "",
516    ),
517    Migration::new(
518        SchemaVersion(22),
519        "add columns_json to fts_property_rebuild_staging for multi-column rebuild support",
520        "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
521    ),
522    Migration::new(
523        SchemaVersion(23),
524        "drop global fts_node_properties table (replaced by per-kind fts_props_<kind> tables)",
525        "DROP TABLE IF EXISTS fts_node_properties;",
526    ),
527    Migration::new(
528        SchemaVersion(24),
529        "projection table registry and collision-proof per-kind table names",
530        "",
531    ),
532    // NOTE: Vector identity belongs to the embedder.
533    // `vector_embedding_profiles.model_identity` (and `model_version`) is a
534    // record of what the embedder reports on activation — it is never a
535    // user-authored or user-editable configuration field. The singleton
536    // partial unique index below enforces at most one active profile per
537    // database. See
538    // dev/notes/design-db-wide-embedding-per-kind-vector-indexing-2026-04-22.md.
539    Migration::new(
540        SchemaVersion(25),
541        "managed vector projection tables: embedding profiles, per-kind index schemas, async work queue",
542        r"
543                CREATE TABLE IF NOT EXISTS vector_embedding_profiles (
544                    profile_id           INTEGER PRIMARY KEY AUTOINCREMENT,
545                    profile_name         TEXT    NOT NULL,
546                    model_identity       TEXT    NOT NULL,
547                    model_version        TEXT,
548                    dimensions           INTEGER NOT NULL,
549                    normalization_policy TEXT,
550                    max_tokens           INTEGER,
551                    active               INTEGER NOT NULL DEFAULT 0,
552                    activated_at         INTEGER,
553                    created_at           INTEGER NOT NULL
554                );
555
556                CREATE UNIQUE INDEX IF NOT EXISTS idx_vep_singleton_active
557                    ON vector_embedding_profiles(active)
558                    WHERE active = 1;
559
560                CREATE UNIQUE INDEX IF NOT EXISTS idx_vep_identity
561                    ON vector_embedding_profiles(model_identity, model_version, dimensions);
562
563                CREATE TABLE IF NOT EXISTS vector_index_schemas (
564                    kind                  TEXT PRIMARY KEY,
565                    enabled               INTEGER NOT NULL DEFAULT 1,
566                    source_mode           TEXT    NOT NULL,
567                    source_config_json    TEXT,
568                    chunking_policy       TEXT,
569                    preprocessing_policy  TEXT,
570                    state                 TEXT    NOT NULL DEFAULT 'fresh',
571                    last_error            TEXT,
572                    last_completed_at     INTEGER,
573                    created_at            INTEGER NOT NULL,
574                    updated_at            INTEGER NOT NULL
575                );
576
577                CREATE TABLE IF NOT EXISTS vector_projection_work (
578                    work_id              INTEGER PRIMARY KEY AUTOINCREMENT,
579                    kind                 TEXT    NOT NULL,
580                    node_logical_id      TEXT,
581                    chunk_id             TEXT    NOT NULL,
582                    canonical_hash       TEXT    NOT NULL,
583                    priority             INTEGER NOT NULL DEFAULT 0,
584                    embedding_profile_id INTEGER NOT NULL REFERENCES vector_embedding_profiles(profile_id),
585                    attempt_count        INTEGER NOT NULL DEFAULT 0,
586                    last_error           TEXT,
587                    state                TEXT    NOT NULL DEFAULT 'pending',
588                    created_at           INTEGER NOT NULL,
589                    updated_at           INTEGER NOT NULL
590                );
591
592                CREATE INDEX IF NOT EXISTS idx_vpw_schedule
593                    ON vector_projection_work(state, priority DESC, created_at ASC);
594
595                CREATE INDEX IF NOT EXISTS idx_vpw_chunk
596                    ON vector_projection_work(chunk_id);
597                ",
598    ),
599];
600
601#[derive(Clone, Debug, PartialEq, Eq)]
602pub struct BootstrapReport {
603    pub sqlite_version: String,
604    pub applied_versions: Vec<SchemaVersion>,
605    pub vector_profile_enabled: bool,
606}
607
608#[derive(Clone, Debug, Default)]
609pub struct SchemaManager;
610
611impl SchemaManager {
612    #[must_use]
613    pub fn new() -> Self {
614        Self
615    }
616
617    /// Bootstrap the database schema, applying any pending migrations.
618    ///
619    /// # Errors
620    ///
621    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
622    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
623    /// to a version newer than this engine supports.
624    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
625        self.initialize_connection(conn)?;
626        Self::ensure_metadata_tables(conn)?;
627
628        // Downgrade protection
629        let max_applied: u32 = conn.query_row(
630            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
631            [],
632            |row| row.get(0),
633        )?;
634        let engine_version = self.current_version().0;
635        trace_info!(
636            current_version = max_applied,
637            engine_version,
638            "schema bootstrap: version check"
639        );
640        if max_applied > engine_version {
641            trace_error!(
642                database_version = max_applied,
643                engine_version,
644                "schema version mismatch: database is newer than engine"
645            );
646            return Err(SchemaError::VersionMismatch {
647                database_version: max_applied,
648                engine_version,
649            });
650        }
651
652        let mut applied_versions = Vec::new();
653        for migration in self.migrations() {
654            let already_applied = conn
655                .query_row(
656                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
657                    [i64::from(migration.version.0)],
658                    |row| row.get::<_, i64>(0),
659                )
660                .optional()?
661                .is_some();
662
663            if already_applied {
664                continue;
665            }
666
667            let tx = conn.unchecked_transaction()?;
668            match migration.version {
669                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
670                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
671                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
672                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
673                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
674                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
675                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
676                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
677                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
678                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
679                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
680                SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
681                SchemaVersion(21) => Self::ensure_per_kind_fts_tables(&tx)?,
682                SchemaVersion(22) => Self::ensure_staging_columns_json(&tx)?,
683                SchemaVersion(24) => Self::ensure_projection_table_registry(&tx)?,
684                _ => tx.execute_batch(migration.sql)?,
685            }
686            tx.execute(
687                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
688                (i64::from(migration.version.0), migration.description),
689            )?;
690            tx.commit()?;
691            trace_info!(
692                version = migration.version.0,
693                description = migration.description,
694                "schema migration applied"
695            );
696            applied_versions.push(migration.version);
697        }
698
699        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
700        let vector_profile_count: i64 = conn.query_row(
701            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
702            [],
703            |row| row.get(0),
704        )?;
705        Ok(BootstrapReport {
706            sqlite_version,
707            applied_versions,
708            vector_profile_enabled: vector_profile_count > 0,
709        })
710    }
711
712    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
713        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
714        let columns = stmt
715            .query_map([], |row| row.get::<_, String>(1))?
716            .collect::<Result<Vec<_>, _>>()?;
717        let has_applied_at = columns.iter().any(|column| column == "applied_at");
718        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
719
720        if !has_applied_at {
721            conn.execute(
722                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
723                [],
724            )?;
725        }
726        if !has_snapshot_hash {
727            conn.execute(
728                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
729                [],
730            )?;
731        }
732        conn.execute(
733            r"
734            UPDATE vector_embedding_contracts
735            SET
736                applied_at = CASE
737                    WHEN applied_at = 0 THEN updated_at
738                    ELSE applied_at
739                END,
740                snapshot_hash = CASE
741                    WHEN snapshot_hash = '' THEN 'legacy'
742                    ELSE snapshot_hash
743                END
744            ",
745            [],
746        )?;
747        Ok(())
748    }
749
750    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
751        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
752        let columns = stmt
753            .query_map([], |row| row.get::<_, String>(1))?
754            .collect::<Result<Vec<_>, _>>()?;
755        let has_contract_format_version = columns
756            .iter()
757            .any(|column| column == "contract_format_version");
758
759        if !has_contract_format_version {
760            conn.execute(
761                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
762                [],
763            )?;
764        }
765        conn.execute(
766            r"
767            UPDATE vector_embedding_contracts
768            SET contract_format_version = 1
769            WHERE contract_format_version = 0
770            ",
771            [],
772        )?;
773        Ok(())
774    }
775
776    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
777        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
778        let columns = stmt
779            .query_map([], |row| row.get::<_, String>(1))?
780            .collect::<Result<Vec<_>, _>>()?;
781        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
782
783        if !has_metadata_json {
784            conn.execute(
785                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
786                [],
787            )?;
788        }
789        Ok(())
790    }
791
792    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
793        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
794        let columns = stmt
795            .query_map([], |row| row.get::<_, String>(1))?
796            .collect::<Result<Vec<_>, _>>()?;
797        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
798
799        if !has_mutation_order {
800            conn.execute(
801                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
802                [],
803            )?;
804        }
805        conn.execute(
806            r"
807            UPDATE operational_mutations
808            SET mutation_order = rowid
809            WHERE mutation_order = 0
810            ",
811            [],
812        )?;
813        conn.execute(
814            r"
815            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
816                ON operational_mutations(collection_name, record_key, mutation_order DESC)
817            ",
818            [],
819        )?;
820        Ok(())
821    }
822
823    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
824        conn.execute_batch(
825            r"
826            CREATE TABLE IF NOT EXISTS node_access_metadata (
827                logical_id TEXT PRIMARY KEY,
828                last_accessed_at INTEGER NOT NULL,
829                updated_at INTEGER NOT NULL
830            );
831
832            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
833                ON node_access_metadata(last_accessed_at DESC);
834            ",
835        )?;
836        Ok(())
837    }
838
839    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
840        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
841        let columns = stmt
842            .query_map([], |row| row.get::<_, String>(1))?
843            .collect::<Result<Vec<_>, _>>()?;
844        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
845
846        if !has_filter_fields_json {
847            conn.execute(
848                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
849                [],
850            )?;
851        }
852
853        conn.execute_batch(
854            r"
855            CREATE TABLE IF NOT EXISTS operational_filter_values (
856                mutation_id TEXT NOT NULL,
857                collection_name TEXT NOT NULL,
858                field_name TEXT NOT NULL,
859                string_value TEXT,
860                integer_value INTEGER,
861                PRIMARY KEY(mutation_id, field_name),
862                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
863                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
864            );
865
866            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
867                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
868            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
869                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
870            ",
871        )?;
872        Ok(())
873    }
874
875    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
876        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
877        let columns = stmt
878            .query_map([], |row| row.get::<_, String>(1))?
879            .collect::<Result<Vec<_>, _>>()?;
880        let has_validation_json = columns.iter().any(|column| column == "validation_json");
881
882        if !has_validation_json {
883            conn.execute(
884                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
885                [],
886            )?;
887        }
888
889        Ok(())
890    }
891
892    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
893        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
894        let columns = stmt
895            .query_map([], |row| row.get::<_, String>(1))?
896            .collect::<Result<Vec<_>, _>>()?;
897        let has_secondary_indexes_json = columns
898            .iter()
899            .any(|column| column == "secondary_indexes_json");
900
901        if !has_secondary_indexes_json {
902            conn.execute(
903                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
904                [],
905            )?;
906        }
907
908        conn.execute_batch(
909            r"
910            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
911                collection_name TEXT NOT NULL,
912                index_name TEXT NOT NULL,
913                subject_kind TEXT NOT NULL,
914                mutation_id TEXT NOT NULL DEFAULT '',
915                record_key TEXT NOT NULL DEFAULT '',
916                sort_timestamp INTEGER,
917                slot1_text TEXT,
918                slot1_integer INTEGER,
919                slot2_text TEXT,
920                slot2_integer INTEGER,
921                slot3_text TEXT,
922                slot3_integer INTEGER,
923                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
924                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
925                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
926            );
927
928            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
929                ON operational_secondary_index_entries(
930                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
931                );
932            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
933                ON operational_secondary_index_entries(
934                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
935                );
936            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
937                ON operational_secondary_index_entries(
938                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
939                );
940            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
941                ON operational_secondary_index_entries(
942                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
943                );
944            ",
945        )?;
946
947        Ok(())
948    }
949
950    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
951        conn.execute_batch(
952            r"
953            CREATE TABLE IF NOT EXISTS operational_retention_runs (
954                id TEXT PRIMARY KEY,
955                collection_name TEXT NOT NULL,
956                executed_at INTEGER NOT NULL,
957                action_kind TEXT NOT NULL,
958                dry_run INTEGER NOT NULL DEFAULT 0,
959                deleted_mutations INTEGER NOT NULL,
960                rows_remaining INTEGER NOT NULL,
961                metadata_json TEXT NOT NULL DEFAULT '',
962                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
963            );
964
965            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
966                ON operational_retention_runs(collection_name, executed_at DESC);
967            ",
968        )?;
969        Ok(())
970    }
971
972    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
973        let node_columns = Self::column_names(conn, "nodes")?;
974        if !node_columns.iter().any(|c| c == "content_ref") {
975            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
976        }
977        conn.execute_batch(
978            r"
979            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
980                ON nodes(content_ref)
981                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
982            ",
983        )?;
984
985        let chunk_columns = Self::column_names(conn, "chunks")?;
986        if !chunk_columns.iter().any(|c| c == "content_hash") {
987            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
988        }
989        Ok(())
990    }
991
992    /// Migration 16: migrate both `fts_nodes` and `fts_node_properties` from
993    /// the default FTS5 simple tokenizer to `unicode61 remove_diacritics 2
994    /// porter` so diacritic-insensitive and stem-aware matches work (e.g.
995    /// `cafe` matching `café`, `shipping` matching `ship`).
996    ///
997    /// FTS5 does not support re-tokenizing an existing index in place, so
998    /// both virtual tables are dropped and recreated with the new
999    /// `tokenize=...` clause. `fts_nodes` is rebuilt inline from the
1000    /// canonical `chunks + nodes` join. `fts_node_properties` is left empty
1001    /// here and repopulated from canonical state by the engine runtime after
1002    /// bootstrap (the property FTS rebuild requires the per-kind
1003    /// `fts_property_schemas` projection that lives in the engine crate).
1004    ///
1005    /// A malformed row encountered during the inline `INSERT ... SELECT`
1006    /// causes the migration to abort: the rusqlite error propagates up
1007    /// through `execute_batch` and rolls back the outer migration
1008    /// transaction so the schema version is not advanced.
1009    fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
1010        conn.execute_batch(
1011            r"
1012            DROP TABLE IF EXISTS fts_nodes;
1013            CREATE VIRTUAL TABLE fts_nodes USING fts5(
1014                chunk_id UNINDEXED,
1015                node_logical_id UNINDEXED,
1016                kind UNINDEXED,
1017                text_content,
1018                tokenize = 'porter unicode61 remove_diacritics 2'
1019            );
1020
1021            DROP TABLE IF EXISTS fts_node_properties;
1022            CREATE VIRTUAL TABLE fts_node_properties USING fts5(
1023                node_logical_id UNINDEXED,
1024                kind UNINDEXED,
1025                text_content,
1026                tokenize = 'porter unicode61 remove_diacritics 2'
1027            );
1028
1029            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1030            SELECT c.id, n.logical_id, n.kind, c.text_content
1031            FROM chunks c
1032            JOIN nodes n
1033              ON n.logical_id = c.node_logical_id
1034             AND n.superseded_at IS NULL;
1035            ",
1036        )?;
1037        Ok(())
1038    }
1039
1040    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
1041        conn.execute_batch(
1042            r"
1043            CREATE TABLE IF NOT EXISTS fts_property_schemas (
1044                kind TEXT PRIMARY KEY,
1045                property_paths_json TEXT NOT NULL,
1046                separator TEXT NOT NULL DEFAULT ' ',
1047                format_version INTEGER NOT NULL DEFAULT 1,
1048                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1049            );
1050
1051            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
1052                node_logical_id UNINDEXED,
1053                kind UNINDEXED,
1054                text_content
1055            );
1056            ",
1057        )?;
1058        Ok(())
1059    }
1060
1061    fn ensure_per_kind_fts_tables(conn: &Connection) -> Result<(), SchemaError> {
1062        // Collect all registered kinds
1063        let kinds: Vec<String> = {
1064            let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
1065            stmt.query_map([], |r| r.get::<_, String>(0))?
1066                .collect::<Result<Vec<_>, _>>()?
1067        };
1068
1069        for kind in &kinds {
1070            let table_name = fts_kind_table_name(kind);
1071            let ddl = format!(
1072                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING fts5(\
1073                    node_logical_id UNINDEXED, \
1074                    text_content, \
1075                    tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1076                )"
1077            );
1078            conn.execute_batch(&ddl)?;
1079
1080            // Enqueue a PENDING rebuild for this kind (idempotent)
1081            conn.execute(
1082                "INSERT OR IGNORE INTO fts_property_rebuild_state \
1083                 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1084                 SELECT ?1, rowid, 'PENDING', 0, CAST(strftime('%s','now') AS INTEGER) * 1000, 0 \
1085                 FROM fts_property_schemas WHERE kind = ?1",
1086                rusqlite::params![kind],
1087            )?;
1088        }
1089
1090        // Drop the old global table (deferred: write sites must be updated in A-6 first)
1091        // Note: actual DROP happens in migration 23, after A-6 redirects write sites
1092        Ok(())
1093    }
1094
1095    fn ensure_staging_columns_json(conn: &Connection) -> Result<(), SchemaError> {
1096        // Idempotent: check if the column already exists before altering
1097        let column_exists: bool = {
1098            let mut stmt = conn.prepare("PRAGMA table_info(fts_property_rebuild_staging)")?;
1099            let names: Vec<String> = stmt
1100                .query_map([], |r| r.get::<_, String>(1))?
1101                .collect::<Result<Vec<_>, _>>()?;
1102            names.iter().any(|n| n == "columns_json")
1103        };
1104
1105        if !column_exists {
1106            conn.execute_batch(
1107                "ALTER TABLE fts_property_rebuild_staging ADD COLUMN columns_json TEXT;",
1108            )?;
1109        }
1110
1111        Ok(())
1112    }
1113
1114    fn ensure_projection_table_registry(conn: &Connection) -> Result<(), SchemaError> {
1115        conn.execute_batch(
1116            r"
1117            CREATE TABLE IF NOT EXISTS projection_table_registry (
1118                kind TEXT NOT NULL,
1119                facet TEXT NOT NULL CHECK (facet IN ('fts', 'vec')),
1120                table_name TEXT NOT NULL UNIQUE,
1121                naming_version INTEGER NOT NULL,
1122                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1123                migrated_from TEXT,
1124                PRIMARY KEY (kind, facet)
1125            );
1126            ",
1127        )?;
1128
1129        let fts_kinds: Vec<String> = conn
1130            .prepare("SELECT kind FROM fts_property_schemas")
1131            .optional()?
1132            .map(|mut stmt| {
1133                stmt.query_map([], |r| r.get::<_, String>(0))?
1134                    .collect::<Result<Vec<_>, _>>()
1135            })
1136            .transpose()?
1137            .unwrap_or_default();
1138
1139        for kind in &fts_kinds {
1140            let table_name = fts_kind_table_name(kind);
1141            let legacy = legacy_fts_kind_table_name(kind);
1142            conn.execute(
1143                "INSERT INTO projection_table_registry \
1144                 (kind, facet, table_name, naming_version, migrated_from) \
1145                 VALUES (?1, 'fts', ?2, 2, ?3) \
1146                 ON CONFLICT(kind, facet) DO UPDATE SET \
1147                     table_name = excluded.table_name, \
1148                     naming_version = excluded.naming_version, \
1149                     migrated_from = excluded.migrated_from",
1150                rusqlite::params![kind, table_name, legacy],
1151            )?;
1152            conn.execute(
1153                "INSERT INTO fts_property_rebuild_state \
1154                 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
1155                 SELECT ?1, rowid, 'PENDING', 0, CAST(strftime('%s','now') AS INTEGER) * 1000, 0 \
1156                 FROM fts_property_schemas WHERE kind = ?1 \
1157                 ON CONFLICT(kind) DO UPDATE SET \
1158                     schema_id = excluded.schema_id, \
1159                     state = CASE \
1160                         WHEN fts_property_rebuild_state.state IN ('BUILDING', 'SWAPPING') \
1161                         THEN fts_property_rebuild_state.state \
1162                         ELSE 'PENDING' \
1163                     END, \
1164                     rows_total = NULL, \
1165                     rows_done = 0, \
1166                     started_at = excluded.started_at, \
1167                     last_progress_at = NULL, \
1168                     error_message = NULL",
1169                rusqlite::params![kind],
1170            )?;
1171        }
1172
1173        let vec_kinds: Vec<String> = conn
1174            .prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'")
1175            .optional()?
1176            .map(|mut stmt| {
1177                stmt.query_map([], |r| r.get::<_, String>(0))?
1178                    .collect::<Result<Vec<_>, _>>()
1179            })
1180            .transpose()?
1181            .unwrap_or_default();
1182
1183        for kind in &vec_kinds {
1184            let table_name = vec_kind_table_name(kind);
1185            let legacy = legacy_vec_kind_table_name(kind);
1186            conn.execute(
1187                "INSERT INTO projection_table_registry \
1188                 (kind, facet, table_name, naming_version, migrated_from) \
1189                 VALUES (?1, 'vec', ?2, 2, ?3) \
1190                 ON CONFLICT(kind, facet) DO UPDATE SET \
1191                     table_name = excluded.table_name, \
1192                     naming_version = excluded.naming_version, \
1193                     migrated_from = excluded.migrated_from",
1194                rusqlite::params![kind, table_name, legacy],
1195            )?;
1196        }
1197
1198        Ok(())
1199    }
1200
1201    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
1202        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1203        let names = stmt
1204            .query_map([], |row| row.get::<_, String>(1))?
1205            .collect::<Result<Vec<_>, _>>()?;
1206        Ok(names)
1207    }
1208
1209    #[must_use]
1210    pub fn current_version(&self) -> SchemaVersion {
1211        self.migrations()
1212            .last()
1213            .map_or(SchemaVersion(0), |migration| migration.version)
1214    }
1215
1216    #[must_use]
1217    pub fn migrations(&self) -> &'static [Migration] {
1218        MIGRATIONS
1219    }
1220
1221    /// Set the recommended `SQLite` connection pragmas for fathomdb.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1226    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1227        conn.execute_batch(
1228            r"
1229            PRAGMA foreign_keys = ON;
1230            PRAGMA journal_mode = WAL;
1231            PRAGMA synchronous = NORMAL;
1232            PRAGMA busy_timeout = 5000;
1233            PRAGMA temp_store = MEMORY;
1234            PRAGMA mmap_size = 3000000000;
1235            PRAGMA journal_size_limit = 536870912;
1236            ",
1237        )?;
1238        Ok(())
1239    }
1240
1241    /// Initialize a **read-only** connection with PRAGMAs that are safe for
1242    /// readers.
1243    ///
1244    /// Skips `journal_mode` (requires write; the writer already set WAL),
1245    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
1246    /// (requires write).
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1251    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1252        conn.execute_batch(
1253            r"
1254            PRAGMA foreign_keys = ON;
1255            PRAGMA busy_timeout = 5000;
1256            PRAGMA temp_store = MEMORY;
1257            PRAGMA mmap_size = 3000000000;
1258            ",
1259        )?;
1260        Ok(())
1261    }
1262
1263    /// Ensure the sqlite-vec vector extension profile is registered and the
1264    /// virtual vec table exists.
1265    ///
1266    /// When the `sqlite-vec` feature is enabled this creates the virtual table
1267    /// and records the profile in `vector_profiles` (with `enabled = 1`).
1268    /// When the feature is absent the call always returns
1269    /// [`SchemaError::MissingCapability`].
1270    ///
1271    /// # Errors
1272    ///
1273    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1274    #[cfg(feature = "sqlite-vec")]
1275    pub fn ensure_vector_profile(
1276        &self,
1277        conn: &Connection,
1278        profile: &str,
1279        table_name: &str,
1280        dimension: usize,
1281    ) -> Result<(), SchemaError> {
1282        conn.execute_batch(&format!(
1283            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1284                chunk_id TEXT PRIMARY KEY,\
1285                embedding float[{dimension}]\
1286            )"
1287        ))?;
1288        // Vector dimensions are small positive integers (typically <= a few
1289        // thousand); convert explicitly so clippy's cast_possible_wrap is happy.
1290        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1291            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1292                format!("vector dimension {dimension} does not fit in i64").into(),
1293            ))
1294        })?;
1295        conn.execute(
1296            "INSERT OR REPLACE INTO vector_profiles \
1297             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1298            rusqlite::params![profile, table_name, dimension_i64],
1299        )?;
1300        Ok(())
1301    }
1302
1303    /// # Errors
1304    ///
1305    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1306    /// feature is not compiled in.
1307    #[cfg(not(feature = "sqlite-vec"))]
1308    pub fn ensure_vector_profile(
1309        &self,
1310        _conn: &Connection,
1311        _profile: &str,
1312        _table_name: &str,
1313        _dimension: usize,
1314    ) -> Result<(), SchemaError> {
1315        Err(SchemaError::MissingCapability("sqlite-vec"))
1316    }
1317
1318    /// Ensure a per-kind sqlite-vec virtual table exists and the
1319    /// `projection_profiles` row is recorded under `(kind, 'vec')`.
1320    ///
1321    /// The virtual table is named `vec_<sanitized_kind>` (via
1322    /// [`vec_kind_table_name`]).  A row is also written to the legacy
1323    /// `vector_profiles` table so that [`BootstrapReport::vector_profile_enabled`]
1324    /// continues to work.
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1329    #[cfg(feature = "sqlite-vec")]
1330    pub fn ensure_vec_kind_profile(
1331        &self,
1332        conn: &Connection,
1333        kind: &str,
1334        dimension: usize,
1335    ) -> Result<(), SchemaError> {
1336        let table_name = vec_kind_table_name(kind);
1337        conn.execute_batch(&format!(
1338            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1339                chunk_id TEXT PRIMARY KEY,\
1340                embedding float[{dimension}]\
1341            )"
1342        ))?;
1343        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1344            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1345                format!("vector dimension {dimension} does not fit in i64").into(),
1346            ))
1347        })?;
1348        // Record in the legacy vector_profiles table so vector_profile_enabled works.
1349        conn.execute(
1350            "INSERT OR REPLACE INTO vector_profiles \
1351             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1352            rusqlite::params![kind, table_name, dimension_i64],
1353        )?;
1354        // Record in projection_profiles under (kind, 'vec').
1355        // Use "dimensions" (plural) to match what get_vec_profile extracts via json_extract.
1356        let config_json =
1357            format!(r#"{{"table_name":"{table_name}","dimensions":{dimension_i64}}}"#);
1358        conn.execute(
1359            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
1360             VALUES (?1, 'vec', ?2, CAST(strftime('%s','now') AS INTEGER), CAST(strftime('%s','now') AS INTEGER)) \
1361             ON CONFLICT(kind, facet) DO UPDATE SET \
1362                 config_json = ?2, \
1363                 active_at   = CAST(strftime('%s','now') AS INTEGER)",
1364            rusqlite::params![kind, config_json],
1365        )?;
1366        Ok(())
1367    }
1368
1369    /// # Errors
1370    ///
1371    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1372    /// feature is not compiled in.
1373    #[cfg(not(feature = "sqlite-vec"))]
1374    pub fn ensure_vec_kind_profile(
1375        &self,
1376        _conn: &Connection,
1377        _kind: &str,
1378        _dimension: usize,
1379    ) -> Result<(), SchemaError> {
1380        Err(SchemaError::MissingCapability("sqlite-vec"))
1381    }
1382
1383    /// Create the internal migration-tracking table if it does not exist.
1384    ///
1385    /// # Errors
1386    ///
1387    /// Returns [`SchemaError`] if the DDL fails to execute.
1388    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1389        conn.execute_batch(
1390            r"
1391            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1392                version INTEGER PRIMARY KEY,
1393                description TEXT NOT NULL,
1394                applied_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1395            );
1396            ",
1397        )?;
1398        Ok(())
1399    }
1400}
1401
1402/// Default FTS5 tokenizer used when no per-kind profile is configured.
1403pub const DEFAULT_FTS_TOKENIZER: &str = "porter unicode61 remove_diacritics 2";
1404
1405/// Derive the canonical FTS5 virtual-table name for a given node `kind`.
1406///
1407/// The name keeps a readable sanitized slug and always appends a stable hash
1408/// suffix so distinct kinds never collide after sanitization.
1409#[must_use]
1410pub fn fts_kind_table_name(kind: &str) -> String {
1411    projection_table_name("fts_props", kind)
1412}
1413
1414/// Legacy pre-0.5.4 FTS table-name derivation, retained for migrations and diagnostics.
1415#[must_use]
1416pub fn legacy_fts_kind_table_name(kind: &str) -> String {
1417    let slug = sanitize_kind_slug(kind);
1418    let prefixed = format!("fts_props_{slug}");
1419    if prefixed.len() <= 63 {
1420        prefixed
1421    } else {
1422        let hex = sha256_hex(kind);
1423        let hex_suffix = &hex[..7];
1424        let slug_truncated = if slug.len() > 45 { &slug[..45] } else { &slug };
1425        format!("fts_props_{slug_truncated}_{hex_suffix}")
1426    }
1427}
1428
1429/// Derive the canonical sqlite-vec virtual-table name for a given node `kind`.
1430///
1431/// The name keeps a readable sanitized slug and always appends a stable hash
1432/// suffix so distinct kinds never collide after sanitization.
1433#[must_use]
1434pub fn vec_kind_table_name(kind: &str) -> String {
1435    projection_table_name("vec", kind)
1436}
1437
1438/// Legacy pre-0.5.4 sqlite-vec table-name derivation, retained for migrations and diagnostics.
1439#[must_use]
1440pub fn legacy_vec_kind_table_name(kind: &str) -> String {
1441    let slug = sanitize_kind_slug(kind);
1442    format!("vec_{slug}")
1443}
1444
1445fn projection_table_name(prefix: &str, kind: &str) -> String {
1446    const MAX_IDENTIFIER_LEN: usize = 63;
1447    const HASH_LEN: usize = 10;
1448    let slug = sanitize_kind_slug(kind);
1449    let hex = sha256_hex(kind);
1450    let suffix = &hex[..HASH_LEN];
1451    let budget = MAX_IDENTIFIER_LEN
1452        .saturating_sub(prefix.len())
1453        .saturating_sub(2)
1454        .saturating_sub(HASH_LEN)
1455        .max(1);
1456    let slug = if slug.len() > budget {
1457        &slug[..budget]
1458    } else {
1459        &slug
1460    };
1461    format!("{prefix}_{slug}_{suffix}")
1462}
1463
1464fn sanitize_kind_slug(kind: &str) -> String {
1465    let lowered = kind.to_lowercase();
1466    let mut slug = String::with_capacity(lowered.len());
1467    let mut prev_was_underscore = false;
1468    for ch in lowered.chars() {
1469        if ch.is_ascii_alphanumeric() {
1470            slug.push(ch);
1471            prev_was_underscore = false;
1472        } else {
1473            if !prev_was_underscore {
1474                slug.push('_');
1475            }
1476            prev_was_underscore = true;
1477        }
1478    }
1479    let slug = slug.trim_matches('_');
1480    if slug.is_empty() {
1481        "kind".to_owned()
1482    } else {
1483        slug.to_owned()
1484    }
1485}
1486
1487fn sha256_hex(kind: &str) -> String {
1488    let hash = Sha256::digest(kind.as_bytes());
1489    let mut hex = String::with_capacity(hash.len() * 2);
1490    for b in &hash {
1491        use std::fmt::Write as _;
1492        let _ = write!(hex, "{b:02x}");
1493    }
1494    hex
1495}
1496
1497/// Derive the canonical FTS5 column name for a JSON path.
1498///
1499/// Rules:
1500/// 1. Strip leading `$.` or `$` prefix.
1501/// 2. Replace every character that is NOT `[a-z0-9_]` (after lowercasing) with `_`.
1502/// 3. Collapse consecutive underscores.
1503/// 4. If `is_recursive` is `true`, append `_all`.
1504#[must_use]
1505pub fn fts_column_name(path: &str, is_recursive: bool) -> String {
1506    // Step 1: strip prefix
1507    let stripped = if let Some(rest) = path.strip_prefix("$.") {
1508        rest
1509    } else if let Some(rest) = path.strip_prefix('$') {
1510        rest
1511    } else {
1512        path
1513    };
1514
1515    // Step 2-3: normalise
1516    let lowered = stripped.to_lowercase();
1517    let mut col = String::with_capacity(lowered.len());
1518    let mut prev_was_underscore = false;
1519    for ch in lowered.chars() {
1520        if ch.is_ascii_alphanumeric() || ch == '_' {
1521            col.push(ch);
1522            prev_was_underscore = ch == '_';
1523        } else {
1524            if !prev_was_underscore {
1525                col.push('_');
1526            }
1527            prev_was_underscore = true;
1528        }
1529    }
1530
1531    // Strip trailing underscores
1532    let col = col.trim_end_matches('_').to_owned();
1533
1534    // Step 4: recursive suffix
1535    if is_recursive {
1536        format!("{col}_all")
1537    } else {
1538        col
1539    }
1540}
1541
1542/// Look up the FTS tokenizer configured for a given `kind` in `projection_profiles`.
1543///
1544/// Returns [`DEFAULT_FTS_TOKENIZER`] when:
1545/// - the `projection_profiles` table does not exist yet, or
1546/// - no row exists for `(kind, 'fts')`, or
1547/// - the `tokenizer` field in `config_json` is absent or empty.
1548///
1549/// # Errors
1550///
1551/// Returns [`SchemaError::Sqlite`] on any `SQLite` error other than a missing table.
1552pub fn resolve_fts_tokenizer(conn: &Connection, kind: &str) -> Result<String, SchemaError> {
1553    let result = conn
1554        .query_row(
1555            "SELECT json_extract(config_json, '$.tokenizer') FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
1556            [kind],
1557            |row| row.get::<_, Option<String>>(0),
1558        )
1559        .optional();
1560
1561    match result {
1562        Ok(Some(Some(tok))) if !tok.is_empty() => Ok(tok),
1563        Ok(_) => Ok(DEFAULT_FTS_TOKENIZER.to_owned()),
1564        Err(rusqlite::Error::SqliteFailure(_, _)) => {
1565            // Table doesn't exist or other sqlite-level error — return default
1566            Ok(DEFAULT_FTS_TOKENIZER.to_owned())
1567        }
1568        Err(e) => Err(SchemaError::Sqlite(e)),
1569    }
1570}
1571
1572#[cfg(test)]
1573#[allow(clippy::expect_used)]
1574mod tests {
1575    use rusqlite::Connection;
1576
1577    use super::SchemaManager;
1578
1579    #[test]
1580    fn bootstrap_applies_initial_schema() {
1581        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1582        let manager = SchemaManager::new();
1583
1584        let report = manager.bootstrap(&conn).expect("bootstrap report");
1585
1586        assert_eq!(
1587            report.applied_versions.len(),
1588            manager.current_version().0 as usize
1589        );
1590        assert!(report.sqlite_version.starts_with('3'));
1591        let table_count: i64 = conn
1592            .query_row(
1593                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1594                [],
1595                |row| row.get(0),
1596            )
1597            .expect("nodes table exists");
1598        assert_eq!(table_count, 1);
1599    }
1600
1601    // --- Item 2: vector profile tests ---
1602
1603    #[test]
1604    fn vector_profile_not_enabled_without_feature() {
1605        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1606        let manager = SchemaManager::new();
1607        let report = manager.bootstrap(&conn).expect("bootstrap");
1608        assert!(
1609            !report.vector_profile_enabled,
1610            "vector_profile_enabled must be false on a fresh bootstrap"
1611        );
1612    }
1613
1614    #[test]
1615    fn vector_profile_skipped_when_dimension_absent() {
1616        // ensure_vector_profile is never called → enabled stays false
1617        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1618        let manager = SchemaManager::new();
1619        manager.bootstrap(&conn).expect("bootstrap");
1620
1621        let count: i64 = conn
1622            .query_row(
1623                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1624                [],
1625                |row| row.get(0),
1626            )
1627            .expect("count");
1628        assert_eq!(
1629            count, 0,
1630            "no enabled profile without calling ensure_vector_profile"
1631        );
1632    }
1633
1634    #[test]
1635    fn bootstrap_report_reflects_actual_vector_state() {
1636        // After a fresh bootstrap with no vector profile, the report reflects reality.
1637        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1638        let manager = SchemaManager::new();
1639        let report = manager.bootstrap(&conn).expect("bootstrap");
1640
1641        let db_count: i64 = conn
1642            .query_row(
1643                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1644                [],
1645                |row| row.get(0),
1646            )
1647            .expect("count");
1648        assert_eq!(
1649            report.vector_profile_enabled,
1650            db_count > 0,
1651            "BootstrapReport.vector_profile_enabled must match actual DB state"
1652        );
1653    }
1654
1655    #[test]
1656    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1657        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1658        conn.execute_batch(
1659            r#"
1660            CREATE TABLE provenance_events (
1661                id         TEXT PRIMARY KEY,
1662                event_type TEXT NOT NULL,
1663                subject    TEXT NOT NULL,
1664                source_ref TEXT,
1665                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER))
1666            );
1667            CREATE TABLE vector_embedding_contracts (
1668                profile TEXT PRIMARY KEY,
1669                table_name TEXT NOT NULL,
1670                model_identity TEXT NOT NULL,
1671                model_version TEXT NOT NULL,
1672                dimension INTEGER NOT NULL,
1673                normalization_policy TEXT NOT NULL,
1674                chunking_policy TEXT NOT NULL,
1675                preprocessing_policy TEXT NOT NULL,
1676                generator_command_json TEXT NOT NULL,
1677                updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1678                applied_at INTEGER NOT NULL DEFAULT 0,
1679                snapshot_hash TEXT NOT NULL DEFAULT ''
1680            );
1681            INSERT INTO vector_embedding_contracts (
1682                profile,
1683                table_name,
1684                model_identity,
1685                model_version,
1686                dimension,
1687                normalization_policy,
1688                chunking_policy,
1689                preprocessing_policy,
1690                generator_command_json,
1691                updated_at,
1692                applied_at,
1693                snapshot_hash
1694            ) VALUES (
1695                'default',
1696                'vec_nodes_active',
1697                'legacy-model',
1698                '0.9.0',
1699                4,
1700                'l2',
1701                'per_chunk',
1702                'trim',
1703                '["/bin/echo"]',
1704                100,
1705                100,
1706                'legacy'
1707            );
1708            "#,
1709        )
1710        .expect("seed legacy schema");
1711        let manager = SchemaManager::new();
1712
1713        let report = manager.bootstrap(&conn).expect("bootstrap");
1714
1715        assert!(
1716            report.applied_versions.iter().any(|version| version.0 >= 5),
1717            "bootstrap should apply hardening migrations"
1718        );
1719        let format_version: i64 = conn
1720            .query_row(
1721                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1722                [],
1723                |row| row.get(0),
1724            )
1725            .expect("contract_format_version");
1726        assert_eq!(format_version, 1);
1727        let metadata_column_count: i64 = conn
1728            .query_row(
1729                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1730                [],
1731                |row| row.get(0),
1732            )
1733            .expect("metadata_json column count");
1734        assert_eq!(metadata_column_count, 1);
1735    }
1736
1737    #[test]
1738    fn bootstrap_creates_operational_store_tables() {
1739        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1740        let manager = SchemaManager::new();
1741
1742        manager.bootstrap(&conn).expect("bootstrap");
1743
1744        for table in [
1745            "operational_collections",
1746            "operational_mutations",
1747            "operational_current",
1748        ] {
1749            let count: i64 = conn
1750                .query_row(
1751                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1752                    [table],
1753                    |row| row.get(0),
1754                )
1755                .expect("table existence");
1756            assert_eq!(count, 1, "{table} should exist after bootstrap");
1757        }
1758        let mutation_order_columns: i64 = conn
1759            .query_row(
1760                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1761                [],
1762                |row| row.get(0),
1763            )
1764            .expect("mutation_order column exists");
1765        assert_eq!(mutation_order_columns, 1);
1766    }
1767
1768    #[test]
1769    fn bootstrap_is_idempotent_with_operational_store_tables() {
1770        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1771        let manager = SchemaManager::new();
1772
1773        manager.bootstrap(&conn).expect("first bootstrap");
1774        let report = manager.bootstrap(&conn).expect("second bootstrap");
1775
1776        assert!(
1777            report.applied_versions.is_empty(),
1778            "second bootstrap should apply no new migrations"
1779        );
1780        let count: i64 = conn
1781            .query_row(
1782                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1783                [],
1784                |row| row.get(0),
1785        )
1786        .expect("operational_collections table exists");
1787        assert_eq!(count, 1);
1788    }
1789
1790    #[test]
1791    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1792        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1793        conn.execute_batch(
1794            r#"
1795            CREATE TABLE operational_collections (
1796                name TEXT PRIMARY KEY,
1797                kind TEXT NOT NULL,
1798                schema_json TEXT NOT NULL,
1799                retention_json TEXT NOT NULL,
1800                format_version INTEGER NOT NULL DEFAULT 1,
1801                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1802                disabled_at INTEGER
1803            );
1804
1805            CREATE TABLE operational_mutations (
1806                id TEXT PRIMARY KEY,
1807                collection_name TEXT NOT NULL,
1808                record_key TEXT NOT NULL,
1809                op_kind TEXT NOT NULL,
1810                payload_json TEXT NOT NULL,
1811                source_ref TEXT,
1812                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1813                mutation_order INTEGER NOT NULL DEFAULT 0,
1814                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1815            );
1816
1817            CREATE TABLE operational_current (
1818                collection_name TEXT NOT NULL,
1819                record_key TEXT NOT NULL,
1820                payload_json TEXT NOT NULL,
1821                updated_at INTEGER NOT NULL,
1822                last_mutation_id TEXT NOT NULL,
1823                PRIMARY KEY(collection_name, record_key),
1824                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1825                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1826            );
1827
1828            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1829            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1830            INSERT INTO operational_mutations (
1831                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1832            ) VALUES (
1833                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1834            );
1835            "#,
1836        )
1837        .expect("seed recovered operational tables");
1838
1839        let manager = SchemaManager::new();
1840        let report = manager
1841            .bootstrap(&conn)
1842            .expect("bootstrap recovered schema");
1843
1844        assert!(
1845            report.applied_versions.iter().any(|version| version.0 == 8),
1846            "bootstrap should record operational mutation ordering hardening"
1847        );
1848        let mutation_order: i64 = conn
1849            .query_row(
1850                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1851                [],
1852                |row| row.get(0),
1853            )
1854            .expect("mutation_order");
1855        assert_ne!(
1856            mutation_order, 0,
1857            "bootstrap should backfill recovered operational rows"
1858        );
1859        let count: i64 = conn
1860            .query_row(
1861                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1862                [],
1863                |row| row.get(0),
1864            )
1865            .expect("ordering index exists");
1866        assert_eq!(count, 1);
1867    }
1868
1869    #[test]
1870    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1871        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1872        conn.execute_batch(
1873            r#"
1874            CREATE TABLE operational_collections (
1875                name TEXT PRIMARY KEY,
1876                kind TEXT NOT NULL,
1877                schema_json TEXT NOT NULL,
1878                retention_json TEXT NOT NULL,
1879                format_version INTEGER NOT NULL DEFAULT 1,
1880                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1881                disabled_at INTEGER
1882            );
1883
1884            CREATE TABLE operational_mutations (
1885                id TEXT PRIMARY KEY,
1886                collection_name TEXT NOT NULL,
1887                record_key TEXT NOT NULL,
1888                op_kind TEXT NOT NULL,
1889                payload_json TEXT NOT NULL,
1890                source_ref TEXT,
1891                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
1892                mutation_order INTEGER NOT NULL DEFAULT 1,
1893                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1894            );
1895
1896            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1897            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1898            "#,
1899        )
1900        .expect("seed recovered operational schema");
1901
1902        let manager = SchemaManager::new();
1903        let report = manager
1904            .bootstrap(&conn)
1905            .expect("bootstrap recovered schema");
1906
1907        assert!(
1908            report
1909                .applied_versions
1910                .iter()
1911                .any(|version| version.0 == 10),
1912            "bootstrap should record operational filtered read migration"
1913        );
1914        assert!(
1915            report
1916                .applied_versions
1917                .iter()
1918                .any(|version| version.0 == 11),
1919            "bootstrap should record operational validation migration"
1920        );
1921        let filter_fields_json: String = conn
1922            .query_row(
1923                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1924                [],
1925                |row| row.get(0),
1926            )
1927            .expect("filter_fields_json added");
1928        assert_eq!(filter_fields_json, "[]");
1929        let validation_json: String = conn
1930            .query_row(
1931                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1932                [],
1933                |row| row.get(0),
1934            )
1935            .expect("validation_json added");
1936        assert_eq!(validation_json, "");
1937        let table_count: i64 = conn
1938            .query_row(
1939                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1940                [],
1941                |row| row.get(0),
1942        )
1943        .expect("filter table exists");
1944        assert_eq!(table_count, 1);
1945    }
1946
1947    #[test]
1948    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1949        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1950        let manager = SchemaManager::new();
1951        manager.bootstrap(&conn).expect("initial bootstrap");
1952
1953        conn.execute("DROP TABLE fathom_schema_migrations", [])
1954            .expect("drop migration history");
1955        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1956
1957        let report = manager
1958            .bootstrap(&conn)
1959            .expect("rebootstrap existing schema");
1960
1961        assert!(
1962            report
1963                .applied_versions
1964                .iter()
1965                .any(|version| version.0 == 10),
1966            "rebootstrap should re-record migration 10"
1967        );
1968        assert!(
1969            report
1970                .applied_versions
1971                .iter()
1972                .any(|version| version.0 == 11),
1973            "rebootstrap should re-record migration 11"
1974        );
1975        let filter_fields_json: String = conn
1976            .query_row(
1977                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1978                [],
1979                |row| row.get(0),
1980            )
1981            .unwrap_or_else(|_| "[]".to_string());
1982        assert_eq!(filter_fields_json, "[]");
1983        let validation_json: String = conn
1984            .query_row(
1985                "SELECT validation_json FROM operational_collections LIMIT 1",
1986                [],
1987                |row| row.get(0),
1988            )
1989            .unwrap_or_default();
1990        assert_eq!(validation_json, "");
1991    }
1992
1993    #[test]
1994    fn downgrade_detected_returns_version_mismatch() {
1995        use crate::SchemaError;
1996
1997        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1998        let manager = SchemaManager::new();
1999        manager.bootstrap(&conn).expect("initial bootstrap");
2000
2001        conn.execute(
2002            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
2003            (999_i64, "future migration"),
2004        )
2005        .expect("insert future version");
2006
2007        let err = manager
2008            .bootstrap(&conn)
2009            .expect_err("should fail on downgrade");
2010        assert!(
2011            matches!(
2012                err,
2013                SchemaError::VersionMismatch {
2014                    database_version: 999,
2015                    ..
2016                }
2017            ),
2018            "expected VersionMismatch with database_version 999, got: {err}"
2019        );
2020    }
2021
2022    #[test]
2023    fn journal_size_limit_is_set() {
2024        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2025        let manager = SchemaManager::new();
2026        manager
2027            .initialize_connection(&conn)
2028            .expect("initialize_connection");
2029
2030        let limit: i64 = conn
2031            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
2032            .expect("journal_size_limit pragma");
2033        assert_eq!(limit, 536_870_912);
2034    }
2035
2036    #[cfg(feature = "sqlite-vec")]
2037    #[test]
2038    fn vector_profile_created_when_feature_enabled() {
2039        // Register the sqlite-vec extension globally so the in-memory
2040        // connection can use the vec0 module.
2041        unsafe {
2042            #[allow(clippy::missing_transmute_annotations)]
2043            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2044                sqlite_vec::sqlite3_vec_init as *const (),
2045            )));
2046        }
2047        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2048        let manager = SchemaManager::new();
2049        manager.bootstrap(&conn).expect("bootstrap");
2050
2051        manager
2052            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
2053            .expect("ensure_vector_profile");
2054
2055        let count: i64 = conn
2056            .query_row(
2057                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
2058                [],
2059                |row| row.get(0),
2060            )
2061            .expect("count");
2062        assert_eq!(
2063            count, 1,
2064            "vector profile must be enabled after ensure_vector_profile"
2065        );
2066
2067        // Verify the virtual table exists by querying it
2068        let _: i64 = conn
2069            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
2070                row.get(0)
2071            })
2072            .expect("vec_nodes_active table must exist after ensure_vector_profile");
2073    }
2074
2075    // --- A-1: fts_kind_table_name tests ---
2076
2077    #[test]
2078    fn fts_kind_table_name_simple_kind() {
2079        let result = super::fts_kind_table_name("WMKnowledgeObject");
2080        assert!(result.starts_with("fts_props_wmknowledgeobject_"));
2081        assert!(result.len() <= 63);
2082    }
2083
2084    #[test]
2085    fn fts_kind_table_name_another_simple_kind() {
2086        let result = super::fts_kind_table_name("WMExecutionRecord");
2087        assert!(result.starts_with("fts_props_wmexecutionrecord_"));
2088        assert!(result.len() <= 63);
2089    }
2090
2091    #[test]
2092    fn fts_kind_table_name_with_separator_chars() {
2093        let result = super::fts_kind_table_name("MyKind-With.Dots");
2094        assert!(result.starts_with("fts_props_mykind_with_dots_"));
2095        assert!(result.len() <= 63);
2096    }
2097
2098    #[test]
2099    fn fts_kind_table_name_collapses_consecutive_underscores() {
2100        let result = super::fts_kind_table_name("Kind__Double__Underscores");
2101        assert!(result.starts_with("fts_props_kind_double_underscores_"));
2102        assert!(result.len() <= 63);
2103    }
2104
2105    #[test]
2106    fn fts_kind_table_name_long_kind_truncates_with_hash() {
2107        // 61 A's — slug after prefix would be 61 chars, exceeding 63-char limit
2108        let long_kind = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
2109        let result = super::fts_kind_table_name(long_kind);
2110        assert!(
2111            result.len() <= 63,
2112            "result must fit sqlite identifier budget"
2113        );
2114        assert!(
2115            result.starts_with("fts_props_"),
2116            "result must start with fts_props_"
2117        );
2118        // Must contain underscore before 10-char hex suffix
2119        let last_underscore = result.rfind('_').expect("must contain underscore");
2120        let hex_suffix = &result[last_underscore + 1..];
2121        assert_eq!(hex_suffix.len(), 10, "hex suffix must be 10 chars");
2122        assert!(
2123            hex_suffix.chars().all(|c| c.is_ascii_hexdigit()),
2124            "hex suffix must be hex digits"
2125        );
2126    }
2127
2128    #[test]
2129    fn fts_kind_table_name_testkind() {
2130        let result = super::fts_kind_table_name("TestKind");
2131        assert!(result.starts_with("fts_props_testkind_"));
2132    }
2133
2134    #[test]
2135    fn projection_table_names_do_not_collide_after_sanitization() {
2136        assert_ne!(
2137            super::fts_kind_table_name("Foo-Bar"),
2138            super::fts_kind_table_name("Foo_Bar")
2139        );
2140        assert_ne!(
2141            super::vec_kind_table_name("Foo-Bar"),
2142            super::vec_kind_table_name("Foo_Bar")
2143        );
2144        assert_eq!(
2145            super::legacy_fts_kind_table_name("Foo-Bar"),
2146            super::legacy_fts_kind_table_name("Foo_Bar")
2147        );
2148    }
2149
2150    // --- A-1: fts_column_name tests ---
2151
2152    #[test]
2153    fn fts_column_name_simple_field() {
2154        assert_eq!(super::fts_column_name("$.title", false), "title");
2155    }
2156
2157    #[test]
2158    fn fts_column_name_nested_path() {
2159        assert_eq!(
2160            super::fts_column_name("$.payload.content", false),
2161            "payload_content"
2162        );
2163    }
2164
2165    #[test]
2166    fn fts_column_name_recursive() {
2167        assert_eq!(super::fts_column_name("$.payload", true), "payload_all");
2168    }
2169
2170    #[test]
2171    fn fts_column_name_special_chars() {
2172        assert_eq!(
2173            super::fts_column_name("$.some-field[0]", false),
2174            "some_field_0"
2175        );
2176    }
2177
2178    // --- A-1: resolve_fts_tokenizer tests ---
2179
2180    #[test]
2181    fn resolve_fts_tokenizer_returns_default_when_no_table() {
2182        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2183        // No projection_profiles table — should return default
2184        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2185        assert_eq!(result, super::DEFAULT_FTS_TOKENIZER);
2186    }
2187
2188    #[test]
2189    fn resolve_fts_tokenizer_returns_configured_value() {
2190        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2191        conn.execute_batch(
2192            "CREATE TABLE projection_profiles (
2193                kind TEXT NOT NULL,
2194                facet TEXT NOT NULL,
2195                config_json TEXT NOT NULL,
2196                active_at INTEGER,
2197                created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)),
2198                PRIMARY KEY (kind, facet)
2199            );
2200            INSERT INTO projection_profiles (kind, facet, config_json)
2201            VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2202        )
2203        .expect("setup table");
2204
2205        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2206        assert_eq!(result, "trigram");
2207
2208        let default_result =
2209            super::resolve_fts_tokenizer(&conn, "OtherKind").expect("should not error");
2210        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2211    }
2212
2213    // --- A-2: migration 20 tests ---
2214
2215    #[test]
2216    fn migration_20_creates_projection_profiles_table() {
2217        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2218        let manager = SchemaManager::new();
2219        manager.bootstrap(&conn).expect("bootstrap");
2220
2221        let table_exists: i64 = conn
2222            .query_row(
2223                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='projection_profiles'",
2224                [],
2225                |row| row.get(0),
2226            )
2227            .expect("query sqlite_master");
2228        assert_eq!(table_exists, 1, "projection_profiles table must exist");
2229
2230        // Check columns exist by querying them
2231        conn.execute_batch(
2232            "INSERT INTO projection_profiles (kind, facet, config_json)
2233             VALUES ('TestKind', 'fts', '{}')",
2234        )
2235        .expect("insert row to verify columns");
2236        let (kind, facet, config_json): (String, String, String) = conn
2237            .query_row(
2238                "SELECT kind, facet, config_json FROM projection_profiles WHERE kind='TestKind'",
2239                [],
2240                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2241            )
2242            .expect("select columns");
2243        assert_eq!(kind, "TestKind");
2244        assert_eq!(facet, "fts");
2245        assert_eq!(config_json, "{}");
2246    }
2247
2248    #[test]
2249    fn migration_20_primary_key_is_kind_facet() {
2250        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2251        let manager = SchemaManager::new();
2252        manager.bootstrap(&conn).expect("bootstrap");
2253
2254        conn.execute_batch(
2255            "INSERT INTO projection_profiles (kind, facet, config_json)
2256             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"porter\"}');",
2257        )
2258        .expect("first insert");
2259
2260        // Second insert with same (kind, facet) must fail
2261        let result = conn.execute_batch(
2262            "INSERT INTO projection_profiles (kind, facet, config_json)
2263             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2264        );
2265        assert!(
2266            result.is_err(),
2267            "duplicate (kind, facet) must violate PRIMARY KEY"
2268        );
2269    }
2270
2271    #[test]
2272    fn migration_20_resolve_fts_tokenizer_end_to_end() {
2273        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2274        let manager = SchemaManager::new();
2275        manager.bootstrap(&conn).expect("bootstrap");
2276
2277        conn.execute_batch(
2278            "INSERT INTO projection_profiles (kind, facet, config_json)
2279             VALUES ('MyKind', 'fts', '{\"tokenizer\":\"trigram\"}');",
2280        )
2281        .expect("insert profile");
2282
2283        let result = super::resolve_fts_tokenizer(&conn, "MyKind").expect("should not error");
2284        assert_eq!(result, "trigram");
2285
2286        let default_result =
2287            super::resolve_fts_tokenizer(&conn, "UnknownKind").expect("should not error");
2288        assert_eq!(default_result, super::DEFAULT_FTS_TOKENIZER);
2289    }
2290
2291    #[test]
2292    fn migration_21_creates_per_kind_fts_table_and_pending_row() {
2293        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2294        let manager = SchemaManager::new();
2295
2296        // Bootstrap up through migration 20 by using a fresh DB, then manually insert
2297        // a kind into fts_property_schemas so migration 21 picks it up.
2298        // We do a full bootstrap (which applies all migrations including 21).
2299        // Insert the kind before bootstrapping so it is present when migration 21 runs.
2300        // Since bootstrap applies migrations in order and migration 15 creates
2301        // fts_property_schemas, we must insert after that table exists.
2302        // Strategy: bootstrap first, insert kind, then run bootstrap again (idempotent).
2303        manager.bootstrap(&conn).expect("first bootstrap");
2304
2305        conn.execute_batch(
2306            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator, format_version) \
2307             VALUES ('TestKind', '[]', ',', 1)",
2308        )
2309        .expect("insert kind");
2310
2311        // Now run ensure_per_kind_fts_tables directly by calling bootstrap again — migration 21
2312        // is already applied, so we test the function directly.
2313        SchemaManager::ensure_per_kind_fts_tables(&conn).expect("ensure_per_kind_fts_tables");
2314
2315        let table = super::fts_kind_table_name("TestKind");
2316        let count: i64 = conn
2317            .query_row(
2318                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
2319                [table.as_str()],
2320                |r| r.get(0),
2321            )
2322            .expect("count fts table");
2323        assert_eq!(count, 1, "{table} virtual table should be created");
2324
2325        // PENDING row should exist
2326        let state: String = conn
2327            .query_row(
2328                "SELECT state FROM fts_property_rebuild_state WHERE kind='TestKind'",
2329                [],
2330                |r| r.get(0),
2331            )
2332            .expect("rebuild state row");
2333        assert_eq!(state, "PENDING");
2334    }
2335
2336    #[test]
2337    fn migration_22_adds_columns_json_to_staging_table() {
2338        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2339        let manager = SchemaManager::new();
2340        manager.bootstrap(&conn).expect("bootstrap");
2341
2342        let col_count: i64 = conn
2343            .query_row(
2344                "SELECT count(*) FROM pragma_table_info('fts_property_rebuild_staging') WHERE name='columns_json'",
2345                [],
2346                |r| r.get(0),
2347            )
2348            .expect("pragma_table_info");
2349        assert_eq!(
2350            col_count, 1,
2351            "columns_json column must exist after migration 22"
2352        );
2353    }
2354
2355    // --- 0.5.0 item 6: vec_kind_table_name tests ---
2356
2357    #[test]
2358    fn vec_kind_table_name_simple_kind() {
2359        let result = super::vec_kind_table_name("WMKnowledgeObject");
2360        assert!(result.starts_with("vec_wmknowledgeobject_"));
2361        assert!(result.len() <= 63);
2362    }
2363
2364    #[test]
2365    fn vec_kind_table_name_another_kind() {
2366        let result = super::vec_kind_table_name("MyKind");
2367        assert!(result.starts_with("vec_mykind_"));
2368        assert!(result.len() <= 63);
2369    }
2370
2371    #[test]
2372    fn vec_kind_table_name_with_separator_chars() {
2373        let result = super::vec_kind_table_name("MyKind-With.Dots");
2374        assert!(result.starts_with("vec_mykind_with_dots_"));
2375        assert!(result.len() <= 63);
2376    }
2377
2378    #[test]
2379    fn vec_kind_table_name_collapses_consecutive_underscores() {
2380        let result = super::vec_kind_table_name("Kind__Double__Underscores");
2381        assert!(result.starts_with("vec_kind_double_underscores_"));
2382        assert!(result.len() <= 63);
2383    }
2384
2385    // --- 0.5.0 item 6: per-kind vec tables ---
2386
2387    #[cfg(feature = "sqlite-vec")]
2388    #[test]
2389    fn per_kind_vec_table_created_when_vec_profile_registered() {
2390        // Register the sqlite-vec extension globally so the in-memory
2391        // connection can use the vec0 module.
2392        unsafe {
2393            #[allow(clippy::missing_transmute_annotations)]
2394            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
2395                sqlite_vec::sqlite3_vec_init as *const (),
2396            )));
2397        }
2398        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2399        let manager = SchemaManager::new();
2400        manager.bootstrap(&conn).expect("bootstrap");
2401
2402        // Register a vec profile for MyKind — should create the per-kind
2403        // vec table, NOT vec_nodes_active.
2404        manager
2405            .ensure_vec_kind_profile(&conn, "MyKind", 128)
2406            .expect("ensure_vec_kind_profile");
2407
2408        // Per-kind vec virtual table must exist.
2409        let expected_table = super::vec_kind_table_name("MyKind");
2410        let count: i64 = conn
2411            .query_row(
2412                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
2413                rusqlite::params![&expected_table],
2414                |r| r.get(0),
2415            )
2416            .expect("query sqlite_master");
2417        assert_eq!(count, 1, "{expected_table} virtual table must be created");
2418
2419        // projection_profiles row must exist with (kind='MyKind', facet='vec')
2420        let pp_count: i64 = conn
2421            .query_row(
2422                "SELECT count(*) FROM projection_profiles WHERE kind='MyKind' AND facet='vec'",
2423                [],
2424                |r| r.get(0),
2425            )
2426            .expect("query projection_profiles");
2427        assert_eq!(
2428            pp_count, 1,
2429            "projection_profiles row must exist for (MyKind, vec)"
2430        );
2431
2432        // The old global vec_nodes_active must NOT have been created
2433        let old_count: i64 = conn
2434            .query_row(
2435                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
2436                [],
2437                |r| r.get(0),
2438            )
2439            .expect("query sqlite_master");
2440        assert_eq!(
2441            old_count, 0,
2442            "vec_nodes_active must NOT be created for per-kind registration"
2443        );
2444    }
2445
2446    // --- A-6: migration 23 drops fts_node_properties ---
2447    #[test]
2448    fn migration_23_drops_global_fts_node_properties_table() {
2449        let conn = Connection::open_in_memory().expect("in-memory sqlite");
2450        let manager = SchemaManager::new();
2451        manager.bootstrap(&conn).expect("bootstrap");
2452
2453        // After migration 23, fts_node_properties must NOT exist in sqlite_master.
2454        let count: i64 = conn
2455            .query_row(
2456                "SELECT count(*) FROM sqlite_master \
2457                 WHERE type='table' AND name='fts_node_properties'",
2458                [],
2459                |r| r.get(0),
2460            )
2461            .expect("check sqlite_master");
2462        assert_eq!(
2463            count, 0,
2464            "fts_node_properties must be dropped by migration 23"
2465        );
2466    }
2467}