Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6    Migration::new(
7        SchemaVersion(1),
8        "initial canonical schema and runtime tables",
9        r"
10                CREATE TABLE IF NOT EXISTS nodes (
11                    row_id TEXT PRIMARY KEY,
12                    logical_id TEXT NOT NULL,
13                    kind TEXT NOT NULL,
14                    properties BLOB NOT NULL,
15                    created_at INTEGER NOT NULL,
16                    superseded_at INTEGER,
17                    source_ref TEXT,
18                    confidence REAL
19                );
20
21                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22                    ON nodes(logical_id)
23                    WHERE superseded_at IS NULL;
24                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25                    ON nodes(kind, superseded_at);
26                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27                    ON nodes(source_ref);
28
29                CREATE TABLE IF NOT EXISTS edges (
30                    row_id TEXT PRIMARY KEY,
31                    logical_id TEXT NOT NULL,
32                    source_logical_id TEXT NOT NULL,
33                    target_logical_id TEXT NOT NULL,
34                    kind TEXT NOT NULL,
35                    properties BLOB NOT NULL,
36                    created_at INTEGER NOT NULL,
37                    superseded_at INTEGER,
38                    source_ref TEXT,
39                    confidence REAL
40                );
41
42                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43                    ON edges(logical_id)
44                    WHERE superseded_at IS NULL;
45                CREATE INDEX IF NOT EXISTS idx_edges_source_active
46                    ON edges(source_logical_id, kind, superseded_at);
47                CREATE INDEX IF NOT EXISTS idx_edges_target_active
48                    ON edges(target_logical_id, kind, superseded_at);
49                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50                    ON edges(source_ref);
51
52                CREATE TABLE IF NOT EXISTS chunks (
53                    id TEXT PRIMARY KEY,
54                    node_logical_id TEXT NOT NULL,
55                    text_content TEXT NOT NULL,
56                    byte_start INTEGER,
57                    byte_end INTEGER,
58                    created_at INTEGER NOT NULL
59                );
60
61                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62                    ON chunks(node_logical_id);
63
64                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65                    chunk_id UNINDEXED,
66                    node_logical_id UNINDEXED,
67                    kind UNINDEXED,
68                    text_content
69                );
70
71                CREATE TABLE IF NOT EXISTS vector_profiles (
72                    profile TEXT PRIMARY KEY,
73                    table_name TEXT NOT NULL,
74                    dimension INTEGER NOT NULL,
75                    enabled INTEGER NOT NULL DEFAULT 0
76                );
77
78                CREATE TABLE IF NOT EXISTS runs (
79                    id TEXT PRIMARY KEY,
80                    kind TEXT NOT NULL,
81                    status TEXT NOT NULL,
82                    properties BLOB NOT NULL,
83                    created_at INTEGER NOT NULL,
84                    completed_at INTEGER,
85                    superseded_at INTEGER,
86                    source_ref TEXT
87                );
88
89                CREATE TABLE IF NOT EXISTS steps (
90                    id TEXT PRIMARY KEY,
91                    run_id TEXT NOT NULL,
92                    kind TEXT NOT NULL,
93                    status TEXT NOT NULL,
94                    properties BLOB NOT NULL,
95                    created_at INTEGER NOT NULL,
96                    completed_at INTEGER,
97                    superseded_at INTEGER,
98                    source_ref TEXT,
99                    FOREIGN KEY(run_id) REFERENCES runs(id)
100                );
101
102                CREATE TABLE IF NOT EXISTS actions (
103                    id TEXT PRIMARY KEY,
104                    step_id TEXT NOT NULL,
105                    kind TEXT NOT NULL,
106                    status TEXT NOT NULL,
107                    properties BLOB NOT NULL,
108                    created_at INTEGER NOT NULL,
109                    completed_at INTEGER,
110                    superseded_at INTEGER,
111                    source_ref TEXT,
112                    FOREIGN KEY(step_id) REFERENCES steps(id)
113                );
114
115                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116                    ON runs(source_ref);
117                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118                    ON steps(source_ref);
119                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120                    ON actions(source_ref);
121                ",
122    ),
123    Migration::new(
124        SchemaVersion(2),
125        "durable audit trail: provenance_events table",
126        r"
127                CREATE TABLE IF NOT EXISTS provenance_events (
128                    id         TEXT PRIMARY KEY,
129                    event_type TEXT NOT NULL,
130                    subject    TEXT NOT NULL,
131                    source_ref TEXT,
132                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
133                );
134                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135                    ON provenance_events (subject, event_type);
136                ",
137    ),
138    Migration::new(
139        SchemaVersion(3),
140        "vector regeneration contracts",
141        r"
142                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143                    profile TEXT PRIMARY KEY,
144                    table_name TEXT NOT NULL,
145                    model_identity TEXT NOT NULL,
146                    model_version TEXT NOT NULL,
147                    dimension INTEGER NOT NULL,
148                    normalization_policy TEXT NOT NULL,
149                    chunking_policy TEXT NOT NULL,
150                    preprocessing_policy TEXT NOT NULL,
151                    generator_command_json TEXT NOT NULL,
152                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153                );
154                ",
155    ),
156    Migration::new(
157        SchemaVersion(4),
158        "vector regeneration apply metadata",
159        r"
160                ALTER TABLE vector_embedding_contracts
161                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162                ALTER TABLE vector_embedding_contracts
163                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164                UPDATE vector_embedding_contracts
165                SET
166                    applied_at = CASE
167                        WHEN applied_at = 0 THEN updated_at
168                        ELSE applied_at
169                    END,
170                    snapshot_hash = CASE
171                        WHEN snapshot_hash = '' THEN 'legacy'
172                        ELSE snapshot_hash
173                    END;
174                ",
175    ),
176    Migration::new(
177        SchemaVersion(5),
178        "vector regeneration contract format version",
179        r"
180                ALTER TABLE vector_embedding_contracts
181                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182                UPDATE vector_embedding_contracts
183                SET contract_format_version = 1
184                WHERE contract_format_version = 0;
185                ",
186    ),
187    Migration::new(
188        SchemaVersion(6),
189        "provenance metadata payloads",
190        r"
191                ALTER TABLE provenance_events
192                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193                ",
194    ),
195    Migration::new(
196        SchemaVersion(7),
197        "operational store canonical and derived tables",
198        r"
199                CREATE TABLE IF NOT EXISTS operational_collections (
200                    name TEXT PRIMARY KEY,
201                    kind TEXT NOT NULL,
202                    schema_json TEXT NOT NULL,
203                    retention_json TEXT NOT NULL,
204                    format_version INTEGER NOT NULL DEFAULT 1,
205                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206                    disabled_at INTEGER
207                );
208
209                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210                    ON operational_collections(kind, disabled_at);
211
212                CREATE TABLE IF NOT EXISTS operational_mutations (
213                    id TEXT PRIMARY KEY,
214                    collection_name TEXT NOT NULL,
215                    record_key TEXT NOT NULL,
216                    op_kind TEXT NOT NULL,
217                    payload_json TEXT NOT NULL,
218                    source_ref TEXT,
219                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221                );
222
223                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226                    ON operational_mutations(source_ref);
227
228                CREATE TABLE IF NOT EXISTS operational_current (
229                    collection_name TEXT NOT NULL,
230                    record_key TEXT NOT NULL,
231                    payload_json TEXT NOT NULL,
232                    updated_at INTEGER NOT NULL,
233                    last_mutation_id TEXT NOT NULL,
234                    PRIMARY KEY(collection_name, record_key),
235                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237                );
238
239                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240                    ON operational_current(collection_name, updated_at DESC);
241                ",
242    ),
243    Migration::new(
244        SchemaVersion(8),
245        "operational mutation ordering hardening",
246        r"
247                ALTER TABLE operational_mutations
248                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249                UPDATE operational_mutations
250                SET mutation_order = rowid
251                WHERE mutation_order = 0;
252                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
254                ",
255    ),
256    Migration::new(
257        SchemaVersion(9),
258        "node last_accessed metadata",
259        r"
260                CREATE TABLE IF NOT EXISTS node_access_metadata (
261                    logical_id TEXT PRIMARY KEY,
262                    last_accessed_at INTEGER NOT NULL,
263                    updated_at INTEGER NOT NULL
264                );
265
266                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267                    ON node_access_metadata(last_accessed_at DESC);
268                ",
269    ),
270    Migration::new(
271        SchemaVersion(10),
272        "operational filtered read contracts and extracted values",
273        r"
274                ALTER TABLE operational_collections
275                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277                CREATE TABLE IF NOT EXISTS operational_filter_values (
278                    mutation_id TEXT NOT NULL,
279                    collection_name TEXT NOT NULL,
280                    field_name TEXT NOT NULL,
281                    string_value TEXT,
282                    integer_value INTEGER,
283                    PRIMARY KEY(mutation_id, field_name),
284                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286                );
287
288                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292                ",
293    ),
294    Migration::new(
295        SchemaVersion(11),
296        "operational payload validation contracts",
297        r"
298                ALTER TABLE operational_collections
299                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300                ",
301    ),
302    Migration::new(
303        SchemaVersion(12),
304        "operational secondary indexes",
305        r"
306                ALTER TABLE operational_collections
307                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310                    collection_name TEXT NOT NULL,
311                    index_name TEXT NOT NULL,
312                    subject_kind TEXT NOT NULL,
313                    mutation_id TEXT NOT NULL DEFAULT '',
314                    record_key TEXT NOT NULL DEFAULT '',
315                    sort_timestamp INTEGER,
316                    slot1_text TEXT,
317                    slot1_integer INTEGER,
318                    slot2_text TEXT,
319                    slot2_integer INTEGER,
320                    slot3_text TEXT,
321                    slot3_integer INTEGER,
322                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325                );
326
327                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328                    ON operational_secondary_index_entries(
329                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330                    );
331                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332                    ON operational_secondary_index_entries(
333                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334                    );
335                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336                    ON operational_secondary_index_entries(
337                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338                    );
339                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340                    ON operational_secondary_index_entries(
341                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342                );
343                ",
344    ),
345    Migration::new(
346        SchemaVersion(13),
347        "operational retention run metadata",
348        r"
349                CREATE TABLE IF NOT EXISTS operational_retention_runs (
350                    id TEXT PRIMARY KEY,
351                    collection_name TEXT NOT NULL,
352                    executed_at INTEGER NOT NULL,
353                    action_kind TEXT NOT NULL,
354                    dry_run INTEGER NOT NULL DEFAULT 0,
355                    deleted_mutations INTEGER NOT NULL,
356                    rows_remaining INTEGER NOT NULL,
357                    metadata_json TEXT NOT NULL DEFAULT '',
358                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359                );
360
361                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362                    ON operational_retention_runs(collection_name, executed_at DESC);
363                ",
364    ),
365    Migration::new(
366        SchemaVersion(14),
367        "external content object columns",
368        r"
369                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372                    ON nodes(content_ref)
373                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376                ",
377    ),
378    Migration::new(
379        SchemaVersion(15),
380        "FTS property projection schemas",
381        r"
382                CREATE TABLE IF NOT EXISTS fts_property_schemas (
383                    kind TEXT PRIMARY KEY,
384                    property_paths_json TEXT NOT NULL,
385                    separator TEXT NOT NULL DEFAULT ' ',
386                    format_version INTEGER NOT NULL DEFAULT 1,
387                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
388                );
389
390                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391                    node_logical_id UNINDEXED,
392                    kind UNINDEXED,
393                    text_content
394                );
395                ",
396    ),
397    Migration::new(
398        SchemaVersion(16),
399        "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
400        // DDL applied by `ensure_unicode_porter_fts_tokenizers`; the hook
401        // drops, recreates, and rebuilds both tables from canonical state.
402        // The FTS5 chained tokenizer syntax requires the wrapper (porter)
403        // before the base tokenizer, so the applied `tokenize=` clause is
404        // `'porter unicode61 remove_diacritics 2'`.
405        "",
406    ),
407    Migration::new(
408        SchemaVersion(17),
409        "fts property position-map sidecar for recursive extraction",
410        // Sidecar position map for property FTS. The recursive extraction
411        // walk in the engine emits one row per scalar leaf contributing to a
412        // given node's property FTS blob, carrying the half-open byte range
413        // `[start_offset, end_offset)` within the blob and the JSON-path of
414        // the originating leaf. Phase 5 uses this to attribute tokens back
415        // to their source leaves.
416        //
417        // The existing `fts_property_schemas.property_paths_json` column is
418        // reused unchanged at the DDL level; the engine-side JSON decoder
419        // tolerates both the legacy shape (bare strings = scalar) and the
420        // new shape (objects with `mode` = `scalar`|`recursive`, optional
421        // `exclude_paths`). Backwards compatibility is guaranteed because a
422        // bare JSON array of strings still deserialises into scalar-mode
423        // entries.
424        r"
425                CREATE TABLE IF NOT EXISTS fts_node_property_positions (
426                    node_logical_id TEXT NOT NULL,
427                    kind TEXT NOT NULL,
428                    start_offset INTEGER NOT NULL,
429                    end_offset INTEGER NOT NULL,
430                    leaf_path TEXT NOT NULL
431                );
432
433                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
434                    ON fts_node_property_positions(node_logical_id, kind);
435
436                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
437                    ON fts_node_property_positions(kind);
438                ",
439    ),
440    Migration::new(
441        SchemaVersion(18),
442        "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
443        // P4-P2-4: the v17 sidecar DDL did not enforce uniqueness of the
444        // `(node_logical_id, kind, start_offset)` tuple, so a buggy rebuild
445        // path could silently double-insert a leaf and break attribution
446        // lookups. Drop and recreate the table with the UNIQUE constraint,
447        // preserving the existing indexes. The DDL leaves the table empty,
448        // which the open-time rebuild guard in `ExecutionCoordinator::open`
449        // detects (empty positions + recursive schemas present) and
450        // repopulates from canonical state on the next open. The rebuild
451        // path is idempotent and safe to run unconditionally.
452        //
453        // `fts_node_property_positions` is a regular SQLite table, not an
454        // FTS5 virtual table, so UNIQUE constraints are supported.
455        r"
456                DROP TABLE IF EXISTS fts_node_property_positions;
457
458                CREATE TABLE fts_node_property_positions (
459                    node_logical_id TEXT NOT NULL,
460                    kind TEXT NOT NULL,
461                    start_offset INTEGER NOT NULL,
462                    end_offset INTEGER NOT NULL,
463                    leaf_path TEXT NOT NULL,
464                    UNIQUE(node_logical_id, kind, start_offset)
465                );
466
467                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
468                    ON fts_node_property_positions(node_logical_id, kind);
469
470                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
471                    ON fts_node_property_positions(kind);
472                ",
473    ),
474    Migration::new(
475        SchemaVersion(19),
476        "async property-FTS rebuild staging and state tables",
477        r"
478                CREATE TABLE IF NOT EXISTS fts_property_rebuild_staging (
479                    kind TEXT NOT NULL,
480                    node_logical_id TEXT NOT NULL,
481                    text_content TEXT NOT NULL,
482                    positions_blob BLOB,
483                    PRIMARY KEY (kind, node_logical_id)
484                );
485
486                CREATE TABLE IF NOT EXISTS fts_property_rebuild_state (
487                    kind TEXT PRIMARY KEY,
488                    schema_id INTEGER NOT NULL,
489                    state TEXT NOT NULL,
490                    rows_total INTEGER,
491                    rows_done INTEGER NOT NULL DEFAULT 0,
492                    started_at INTEGER NOT NULL,
493                    last_progress_at INTEGER,
494                    error_message TEXT,
495                    is_first_registration INTEGER NOT NULL DEFAULT 0
496                );
497                ",
498    ),
499];
500
501#[derive(Clone, Debug, PartialEq, Eq)]
502pub struct BootstrapReport {
503    pub sqlite_version: String,
504    pub applied_versions: Vec<SchemaVersion>,
505    pub vector_profile_enabled: bool,
506}
507
508#[derive(Clone, Debug, Default)]
509pub struct SchemaManager;
510
511impl SchemaManager {
512    #[must_use]
513    pub fn new() -> Self {
514        Self
515    }
516
517    /// Bootstrap the database schema, applying any pending migrations.
518    ///
519    /// # Errors
520    ///
521    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
522    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
523    /// to a version newer than this engine supports.
524    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
525        self.initialize_connection(conn)?;
526        Self::ensure_metadata_tables(conn)?;
527
528        // Downgrade protection
529        let max_applied: u32 = conn.query_row(
530            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
531            [],
532            |row| row.get(0),
533        )?;
534        let engine_version = self.current_version().0;
535        trace_info!(
536            current_version = max_applied,
537            engine_version,
538            "schema bootstrap: version check"
539        );
540        if max_applied > engine_version {
541            trace_error!(
542                database_version = max_applied,
543                engine_version,
544                "schema version mismatch: database is newer than engine"
545            );
546            return Err(SchemaError::VersionMismatch {
547                database_version: max_applied,
548                engine_version,
549            });
550        }
551
552        let mut applied_versions = Vec::new();
553        for migration in self.migrations() {
554            let already_applied = conn
555                .query_row(
556                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
557                    [i64::from(migration.version.0)],
558                    |row| row.get::<_, i64>(0),
559                )
560                .optional()?
561                .is_some();
562
563            if already_applied {
564                continue;
565            }
566
567            let tx = conn.unchecked_transaction()?;
568            match migration.version {
569                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
570                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
571                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
572                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
573                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
574                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
575                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
576                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
577                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
578                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
579                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
580                SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
581                _ => tx.execute_batch(migration.sql)?,
582            }
583            tx.execute(
584                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
585                (i64::from(migration.version.0), migration.description),
586            )?;
587            tx.commit()?;
588            trace_info!(
589                version = migration.version.0,
590                description = migration.description,
591                "schema migration applied"
592            );
593            applied_versions.push(migration.version);
594        }
595
596        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
597        let vector_profile_count: i64 = conn.query_row(
598            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
599            [],
600            |row| row.get(0),
601        )?;
602        Ok(BootstrapReport {
603            sqlite_version,
604            applied_versions,
605            vector_profile_enabled: vector_profile_count > 0,
606        })
607    }
608
609    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
610        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
611        let columns = stmt
612            .query_map([], |row| row.get::<_, String>(1))?
613            .collect::<Result<Vec<_>, _>>()?;
614        let has_applied_at = columns.iter().any(|column| column == "applied_at");
615        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
616
617        if !has_applied_at {
618            conn.execute(
619                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
620                [],
621            )?;
622        }
623        if !has_snapshot_hash {
624            conn.execute(
625                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
626                [],
627            )?;
628        }
629        conn.execute(
630            r"
631            UPDATE vector_embedding_contracts
632            SET
633                applied_at = CASE
634                    WHEN applied_at = 0 THEN updated_at
635                    ELSE applied_at
636                END,
637                snapshot_hash = CASE
638                    WHEN snapshot_hash = '' THEN 'legacy'
639                    ELSE snapshot_hash
640                END
641            ",
642            [],
643        )?;
644        Ok(())
645    }
646
647    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
648        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
649        let columns = stmt
650            .query_map([], |row| row.get::<_, String>(1))?
651            .collect::<Result<Vec<_>, _>>()?;
652        let has_contract_format_version = columns
653            .iter()
654            .any(|column| column == "contract_format_version");
655
656        if !has_contract_format_version {
657            conn.execute(
658                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
659                [],
660            )?;
661        }
662        conn.execute(
663            r"
664            UPDATE vector_embedding_contracts
665            SET contract_format_version = 1
666            WHERE contract_format_version = 0
667            ",
668            [],
669        )?;
670        Ok(())
671    }
672
673    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
674        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
675        let columns = stmt
676            .query_map([], |row| row.get::<_, String>(1))?
677            .collect::<Result<Vec<_>, _>>()?;
678        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
679
680        if !has_metadata_json {
681            conn.execute(
682                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
683                [],
684            )?;
685        }
686        Ok(())
687    }
688
689    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
690        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
691        let columns = stmt
692            .query_map([], |row| row.get::<_, String>(1))?
693            .collect::<Result<Vec<_>, _>>()?;
694        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
695
696        if !has_mutation_order {
697            conn.execute(
698                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
699                [],
700            )?;
701        }
702        conn.execute(
703            r"
704            UPDATE operational_mutations
705            SET mutation_order = rowid
706            WHERE mutation_order = 0
707            ",
708            [],
709        )?;
710        conn.execute(
711            r"
712            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
713                ON operational_mutations(collection_name, record_key, mutation_order DESC)
714            ",
715            [],
716        )?;
717        Ok(())
718    }
719
720    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
721        conn.execute_batch(
722            r"
723            CREATE TABLE IF NOT EXISTS node_access_metadata (
724                logical_id TEXT PRIMARY KEY,
725                last_accessed_at INTEGER NOT NULL,
726                updated_at INTEGER NOT NULL
727            );
728
729            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
730                ON node_access_metadata(last_accessed_at DESC);
731            ",
732        )?;
733        Ok(())
734    }
735
736    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
737        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
738        let columns = stmt
739            .query_map([], |row| row.get::<_, String>(1))?
740            .collect::<Result<Vec<_>, _>>()?;
741        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
742
743        if !has_filter_fields_json {
744            conn.execute(
745                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
746                [],
747            )?;
748        }
749
750        conn.execute_batch(
751            r"
752            CREATE TABLE IF NOT EXISTS operational_filter_values (
753                mutation_id TEXT NOT NULL,
754                collection_name TEXT NOT NULL,
755                field_name TEXT NOT NULL,
756                string_value TEXT,
757                integer_value INTEGER,
758                PRIMARY KEY(mutation_id, field_name),
759                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
760                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
761            );
762
763            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
764                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
765            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
766                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
767            ",
768        )?;
769        Ok(())
770    }
771
772    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
773        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
774        let columns = stmt
775            .query_map([], |row| row.get::<_, String>(1))?
776            .collect::<Result<Vec<_>, _>>()?;
777        let has_validation_json = columns.iter().any(|column| column == "validation_json");
778
779        if !has_validation_json {
780            conn.execute(
781                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
782                [],
783            )?;
784        }
785
786        Ok(())
787    }
788
789    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
790        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
791        let columns = stmt
792            .query_map([], |row| row.get::<_, String>(1))?
793            .collect::<Result<Vec<_>, _>>()?;
794        let has_secondary_indexes_json = columns
795            .iter()
796            .any(|column| column == "secondary_indexes_json");
797
798        if !has_secondary_indexes_json {
799            conn.execute(
800                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
801                [],
802            )?;
803        }
804
805        conn.execute_batch(
806            r"
807            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
808                collection_name TEXT NOT NULL,
809                index_name TEXT NOT NULL,
810                subject_kind TEXT NOT NULL,
811                mutation_id TEXT NOT NULL DEFAULT '',
812                record_key TEXT NOT NULL DEFAULT '',
813                sort_timestamp INTEGER,
814                slot1_text TEXT,
815                slot1_integer INTEGER,
816                slot2_text TEXT,
817                slot2_integer INTEGER,
818                slot3_text TEXT,
819                slot3_integer INTEGER,
820                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
821                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
822                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
823            );
824
825            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
826                ON operational_secondary_index_entries(
827                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
828                );
829            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
830                ON operational_secondary_index_entries(
831                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
832                );
833            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
834                ON operational_secondary_index_entries(
835                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
836                );
837            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
838                ON operational_secondary_index_entries(
839                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
840                );
841            ",
842        )?;
843
844        Ok(())
845    }
846
847    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
848        conn.execute_batch(
849            r"
850            CREATE TABLE IF NOT EXISTS operational_retention_runs (
851                id TEXT PRIMARY KEY,
852                collection_name TEXT NOT NULL,
853                executed_at INTEGER NOT NULL,
854                action_kind TEXT NOT NULL,
855                dry_run INTEGER NOT NULL DEFAULT 0,
856                deleted_mutations INTEGER NOT NULL,
857                rows_remaining INTEGER NOT NULL,
858                metadata_json TEXT NOT NULL DEFAULT '',
859                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
860            );
861
862            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
863                ON operational_retention_runs(collection_name, executed_at DESC);
864            ",
865        )?;
866        Ok(())
867    }
868
869    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
870        let node_columns = Self::column_names(conn, "nodes")?;
871        if !node_columns.iter().any(|c| c == "content_ref") {
872            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
873        }
874        conn.execute_batch(
875            r"
876            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
877                ON nodes(content_ref)
878                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
879            ",
880        )?;
881
882        let chunk_columns = Self::column_names(conn, "chunks")?;
883        if !chunk_columns.iter().any(|c| c == "content_hash") {
884            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
885        }
886        Ok(())
887    }
888
889    /// Migration 16: migrate both `fts_nodes` and `fts_node_properties` from
890    /// the default FTS5 simple tokenizer to `unicode61 remove_diacritics 2
891    /// porter` so diacritic-insensitive and stem-aware matches work (e.g.
892    /// `cafe` matching `café`, `shipping` matching `ship`).
893    ///
894    /// FTS5 does not support re-tokenizing an existing index in place, so
895    /// both virtual tables are dropped and recreated with the new
896    /// `tokenize=...` clause. `fts_nodes` is rebuilt inline from the
897    /// canonical `chunks + nodes` join. `fts_node_properties` is left empty
898    /// here and repopulated from canonical state by the engine runtime after
899    /// bootstrap (the property FTS rebuild requires the per-kind
900    /// `fts_property_schemas` projection that lives in the engine crate).
901    ///
902    /// A malformed row encountered during the inline `INSERT ... SELECT`
903    /// causes the migration to abort: the rusqlite error propagates up
904    /// through `execute_batch` and rolls back the outer migration
905    /// transaction so the schema version is not advanced.
906    fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
907        conn.execute_batch(
908            r"
909            DROP TABLE IF EXISTS fts_nodes;
910            CREATE VIRTUAL TABLE fts_nodes USING fts5(
911                chunk_id UNINDEXED,
912                node_logical_id UNINDEXED,
913                kind UNINDEXED,
914                text_content,
915                tokenize = 'porter unicode61 remove_diacritics 2'
916            );
917
918            DROP TABLE IF EXISTS fts_node_properties;
919            CREATE VIRTUAL TABLE fts_node_properties USING fts5(
920                node_logical_id UNINDEXED,
921                kind UNINDEXED,
922                text_content,
923                tokenize = 'porter unicode61 remove_diacritics 2'
924            );
925
926            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
927            SELECT c.id, n.logical_id, n.kind, c.text_content
928            FROM chunks c
929            JOIN nodes n
930              ON n.logical_id = c.node_logical_id
931             AND n.superseded_at IS NULL;
932            ",
933        )?;
934        Ok(())
935    }
936
937    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
938        conn.execute_batch(
939            r"
940            CREATE TABLE IF NOT EXISTS fts_property_schemas (
941                kind TEXT PRIMARY KEY,
942                property_paths_json TEXT NOT NULL,
943                separator TEXT NOT NULL DEFAULT ' ',
944                format_version INTEGER NOT NULL DEFAULT 1,
945                created_at INTEGER NOT NULL DEFAULT (unixepoch())
946            );
947
948            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
949                node_logical_id UNINDEXED,
950                kind UNINDEXED,
951                text_content
952            );
953            ",
954        )?;
955        Ok(())
956    }
957
958    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
959        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
960        let names = stmt
961            .query_map([], |row| row.get::<_, String>(1))?
962            .collect::<Result<Vec<_>, _>>()?;
963        Ok(names)
964    }
965
966    #[must_use]
967    pub fn current_version(&self) -> SchemaVersion {
968        self.migrations()
969            .last()
970            .map_or(SchemaVersion(0), |migration| migration.version)
971    }
972
973    #[must_use]
974    pub fn migrations(&self) -> &'static [Migration] {
975        MIGRATIONS
976    }
977
978    /// Set the recommended `SQLite` connection pragmas for fathomdb.
979    ///
980    /// # Errors
981    ///
982    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
983    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
984        conn.execute_batch(
985            r"
986            PRAGMA foreign_keys = ON;
987            PRAGMA journal_mode = WAL;
988            PRAGMA synchronous = NORMAL;
989            PRAGMA busy_timeout = 5000;
990            PRAGMA temp_store = MEMORY;
991            PRAGMA mmap_size = 3000000000;
992            PRAGMA journal_size_limit = 536870912;
993            ",
994        )?;
995        Ok(())
996    }
997
998    /// Initialize a **read-only** connection with PRAGMAs that are safe for
999    /// readers.
1000    ///
1001    /// Skips `journal_mode` (requires write; the writer already set WAL),
1002    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
1003    /// (requires write).
1004    ///
1005    /// # Errors
1006    ///
1007    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
1008    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
1009        conn.execute_batch(
1010            r"
1011            PRAGMA foreign_keys = ON;
1012            PRAGMA busy_timeout = 5000;
1013            PRAGMA temp_store = MEMORY;
1014            PRAGMA mmap_size = 3000000000;
1015            ",
1016        )?;
1017        Ok(())
1018    }
1019
1020    /// Ensure the sqlite-vec vector extension profile is registered and the
1021    /// virtual vec table exists.
1022    ///
1023    /// When the `sqlite-vec` feature is enabled this creates the virtual table
1024    /// and records the profile in `vector_profiles` (with `enabled = 1`).
1025    /// When the feature is absent the call always returns
1026    /// [`SchemaError::MissingCapability`].
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1031    #[cfg(feature = "sqlite-vec")]
1032    pub fn ensure_vector_profile(
1033        &self,
1034        conn: &Connection,
1035        profile: &str,
1036        table_name: &str,
1037        dimension: usize,
1038    ) -> Result<(), SchemaError> {
1039        conn.execute_batch(&format!(
1040            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1041                chunk_id TEXT PRIMARY KEY,\
1042                embedding float[{dimension}]\
1043            )"
1044        ))?;
1045        // Vector dimensions are small positive integers (typically <= a few
1046        // thousand); convert explicitly so clippy's cast_possible_wrap is happy.
1047        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1048            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1049                format!("vector dimension {dimension} does not fit in i64").into(),
1050            ))
1051        })?;
1052        conn.execute(
1053            "INSERT OR REPLACE INTO vector_profiles \
1054             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1055            rusqlite::params![profile, table_name, dimension_i64],
1056        )?;
1057        Ok(())
1058    }
1059
1060    /// # Errors
1061    ///
1062    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1063    /// feature is not compiled in.
1064    #[cfg(not(feature = "sqlite-vec"))]
1065    pub fn ensure_vector_profile(
1066        &self,
1067        _conn: &Connection,
1068        _profile: &str,
1069        _table_name: &str,
1070        _dimension: usize,
1071    ) -> Result<(), SchemaError> {
1072        Err(SchemaError::MissingCapability("sqlite-vec"))
1073    }
1074
1075    /// Create the internal migration-tracking table if it does not exist.
1076    ///
1077    /// # Errors
1078    ///
1079    /// Returns [`SchemaError`] if the DDL fails to execute.
1080    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1081        conn.execute_batch(
1082            r"
1083            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1084                version INTEGER PRIMARY KEY,
1085                description TEXT NOT NULL,
1086                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1087            );
1088            ",
1089        )?;
1090        Ok(())
1091    }
1092}
1093
1094#[cfg(test)]
1095#[allow(clippy::expect_used)]
1096mod tests {
1097    use rusqlite::Connection;
1098
1099    use super::SchemaManager;
1100
1101    #[test]
1102    fn bootstrap_applies_initial_schema() {
1103        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1104        let manager = SchemaManager::new();
1105
1106        let report = manager.bootstrap(&conn).expect("bootstrap report");
1107
1108        assert_eq!(
1109            report.applied_versions.len(),
1110            manager.current_version().0 as usize
1111        );
1112        assert!(report.sqlite_version.starts_with('3'));
1113        let table_count: i64 = conn
1114            .query_row(
1115                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1116                [],
1117                |row| row.get(0),
1118            )
1119            .expect("nodes table exists");
1120        assert_eq!(table_count, 1);
1121    }
1122
1123    // --- Item 2: vector profile tests ---
1124
1125    #[test]
1126    fn vector_profile_not_enabled_without_feature() {
1127        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1128        let manager = SchemaManager::new();
1129        let report = manager.bootstrap(&conn).expect("bootstrap");
1130        assert!(
1131            !report.vector_profile_enabled,
1132            "vector_profile_enabled must be false on a fresh bootstrap"
1133        );
1134    }
1135
1136    #[test]
1137    fn vector_profile_skipped_when_dimension_absent() {
1138        // ensure_vector_profile is never called → enabled stays false
1139        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1140        let manager = SchemaManager::new();
1141        manager.bootstrap(&conn).expect("bootstrap");
1142
1143        let count: i64 = conn
1144            .query_row(
1145                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1146                [],
1147                |row| row.get(0),
1148            )
1149            .expect("count");
1150        assert_eq!(
1151            count, 0,
1152            "no enabled profile without calling ensure_vector_profile"
1153        );
1154    }
1155
1156    #[test]
1157    fn bootstrap_report_reflects_actual_vector_state() {
1158        // After a fresh bootstrap with no vector profile, the report reflects reality.
1159        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1160        let manager = SchemaManager::new();
1161        let report = manager.bootstrap(&conn).expect("bootstrap");
1162
1163        let db_count: i64 = conn
1164            .query_row(
1165                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1166                [],
1167                |row| row.get(0),
1168            )
1169            .expect("count");
1170        assert_eq!(
1171            report.vector_profile_enabled,
1172            db_count > 0,
1173            "BootstrapReport.vector_profile_enabled must match actual DB state"
1174        );
1175    }
1176
1177    #[test]
1178    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1179        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1180        conn.execute_batch(
1181            r#"
1182            CREATE TABLE provenance_events (
1183                id         TEXT PRIMARY KEY,
1184                event_type TEXT NOT NULL,
1185                subject    TEXT NOT NULL,
1186                source_ref TEXT,
1187                created_at INTEGER NOT NULL DEFAULT (unixepoch())
1188            );
1189            CREATE TABLE vector_embedding_contracts (
1190                profile TEXT PRIMARY KEY,
1191                table_name TEXT NOT NULL,
1192                model_identity TEXT NOT NULL,
1193                model_version TEXT NOT NULL,
1194                dimension INTEGER NOT NULL,
1195                normalization_policy TEXT NOT NULL,
1196                chunking_policy TEXT NOT NULL,
1197                preprocessing_policy TEXT NOT NULL,
1198                generator_command_json TEXT NOT NULL,
1199                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1200                applied_at INTEGER NOT NULL DEFAULT 0,
1201                snapshot_hash TEXT NOT NULL DEFAULT ''
1202            );
1203            INSERT INTO vector_embedding_contracts (
1204                profile,
1205                table_name,
1206                model_identity,
1207                model_version,
1208                dimension,
1209                normalization_policy,
1210                chunking_policy,
1211                preprocessing_policy,
1212                generator_command_json,
1213                updated_at,
1214                applied_at,
1215                snapshot_hash
1216            ) VALUES (
1217                'default',
1218                'vec_nodes_active',
1219                'legacy-model',
1220                '0.9.0',
1221                4,
1222                'l2',
1223                'per_chunk',
1224                'trim',
1225                '["/bin/echo"]',
1226                100,
1227                100,
1228                'legacy'
1229            );
1230            "#,
1231        )
1232        .expect("seed legacy schema");
1233        let manager = SchemaManager::new();
1234
1235        let report = manager.bootstrap(&conn).expect("bootstrap");
1236
1237        assert!(
1238            report.applied_versions.iter().any(|version| version.0 >= 5),
1239            "bootstrap should apply hardening migrations"
1240        );
1241        let format_version: i64 = conn
1242            .query_row(
1243                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1244                [],
1245                |row| row.get(0),
1246            )
1247            .expect("contract_format_version");
1248        assert_eq!(format_version, 1);
1249        let metadata_column_count: i64 = conn
1250            .query_row(
1251                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1252                [],
1253                |row| row.get(0),
1254            )
1255            .expect("metadata_json column count");
1256        assert_eq!(metadata_column_count, 1);
1257    }
1258
1259    #[test]
1260    fn bootstrap_creates_operational_store_tables() {
1261        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1262        let manager = SchemaManager::new();
1263
1264        manager.bootstrap(&conn).expect("bootstrap");
1265
1266        for table in [
1267            "operational_collections",
1268            "operational_mutations",
1269            "operational_current",
1270        ] {
1271            let count: i64 = conn
1272                .query_row(
1273                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1274                    [table],
1275                    |row| row.get(0),
1276                )
1277                .expect("table existence");
1278            assert_eq!(count, 1, "{table} should exist after bootstrap");
1279        }
1280        let mutation_order_columns: i64 = conn
1281            .query_row(
1282                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1283                [],
1284                |row| row.get(0),
1285            )
1286            .expect("mutation_order column exists");
1287        assert_eq!(mutation_order_columns, 1);
1288    }
1289
1290    #[test]
1291    fn bootstrap_is_idempotent_with_operational_store_tables() {
1292        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1293        let manager = SchemaManager::new();
1294
1295        manager.bootstrap(&conn).expect("first bootstrap");
1296        let report = manager.bootstrap(&conn).expect("second bootstrap");
1297
1298        assert!(
1299            report.applied_versions.is_empty(),
1300            "second bootstrap should apply no new migrations"
1301        );
1302        let count: i64 = conn
1303            .query_row(
1304                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1305                [],
1306                |row| row.get(0),
1307        )
1308        .expect("operational_collections table exists");
1309        assert_eq!(count, 1);
1310    }
1311
1312    #[test]
1313    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1314        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1315        conn.execute_batch(
1316            r#"
1317            CREATE TABLE operational_collections (
1318                name TEXT PRIMARY KEY,
1319                kind TEXT NOT NULL,
1320                schema_json TEXT NOT NULL,
1321                retention_json TEXT NOT NULL,
1322                format_version INTEGER NOT NULL DEFAULT 1,
1323                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1324                disabled_at INTEGER
1325            );
1326
1327            CREATE TABLE operational_mutations (
1328                id TEXT PRIMARY KEY,
1329                collection_name TEXT NOT NULL,
1330                record_key TEXT NOT NULL,
1331                op_kind TEXT NOT NULL,
1332                payload_json TEXT NOT NULL,
1333                source_ref TEXT,
1334                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1335                mutation_order INTEGER NOT NULL DEFAULT 0,
1336                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1337            );
1338
1339            CREATE TABLE operational_current (
1340                collection_name TEXT NOT NULL,
1341                record_key TEXT NOT NULL,
1342                payload_json TEXT NOT NULL,
1343                updated_at INTEGER NOT NULL,
1344                last_mutation_id TEXT NOT NULL,
1345                PRIMARY KEY(collection_name, record_key),
1346                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1347                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1348            );
1349
1350            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1351            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1352            INSERT INTO operational_mutations (
1353                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1354            ) VALUES (
1355                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1356            );
1357            "#,
1358        )
1359        .expect("seed recovered operational tables");
1360
1361        let manager = SchemaManager::new();
1362        let report = manager
1363            .bootstrap(&conn)
1364            .expect("bootstrap recovered schema");
1365
1366        assert!(
1367            report.applied_versions.iter().any(|version| version.0 == 8),
1368            "bootstrap should record operational mutation ordering hardening"
1369        );
1370        let mutation_order: i64 = conn
1371            .query_row(
1372                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1373                [],
1374                |row| row.get(0),
1375            )
1376            .expect("mutation_order");
1377        assert_ne!(
1378            mutation_order, 0,
1379            "bootstrap should backfill recovered operational rows"
1380        );
1381        let count: i64 = conn
1382            .query_row(
1383                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1384                [],
1385                |row| row.get(0),
1386            )
1387            .expect("ordering index exists");
1388        assert_eq!(count, 1);
1389    }
1390
1391    #[test]
1392    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1393        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1394        conn.execute_batch(
1395            r#"
1396            CREATE TABLE operational_collections (
1397                name TEXT PRIMARY KEY,
1398                kind TEXT NOT NULL,
1399                schema_json TEXT NOT NULL,
1400                retention_json TEXT NOT NULL,
1401                format_version INTEGER NOT NULL DEFAULT 1,
1402                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1403                disabled_at INTEGER
1404            );
1405
1406            CREATE TABLE operational_mutations (
1407                id TEXT PRIMARY KEY,
1408                collection_name TEXT NOT NULL,
1409                record_key TEXT NOT NULL,
1410                op_kind TEXT NOT NULL,
1411                payload_json TEXT NOT NULL,
1412                source_ref TEXT,
1413                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1414                mutation_order INTEGER NOT NULL DEFAULT 1,
1415                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1416            );
1417
1418            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1419            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1420            "#,
1421        )
1422        .expect("seed recovered operational schema");
1423
1424        let manager = SchemaManager::new();
1425        let report = manager
1426            .bootstrap(&conn)
1427            .expect("bootstrap recovered schema");
1428
1429        assert!(
1430            report
1431                .applied_versions
1432                .iter()
1433                .any(|version| version.0 == 10),
1434            "bootstrap should record operational filtered read migration"
1435        );
1436        assert!(
1437            report
1438                .applied_versions
1439                .iter()
1440                .any(|version| version.0 == 11),
1441            "bootstrap should record operational validation migration"
1442        );
1443        let filter_fields_json: String = conn
1444            .query_row(
1445                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1446                [],
1447                |row| row.get(0),
1448            )
1449            .expect("filter_fields_json added");
1450        assert_eq!(filter_fields_json, "[]");
1451        let validation_json: String = conn
1452            .query_row(
1453                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1454                [],
1455                |row| row.get(0),
1456            )
1457            .expect("validation_json added");
1458        assert_eq!(validation_json, "");
1459        let table_count: i64 = conn
1460            .query_row(
1461                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1462                [],
1463                |row| row.get(0),
1464        )
1465        .expect("filter table exists");
1466        assert_eq!(table_count, 1);
1467    }
1468
1469    #[test]
1470    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1471        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1472        let manager = SchemaManager::new();
1473        manager.bootstrap(&conn).expect("initial bootstrap");
1474
1475        conn.execute("DROP TABLE fathom_schema_migrations", [])
1476            .expect("drop migration history");
1477        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1478
1479        let report = manager
1480            .bootstrap(&conn)
1481            .expect("rebootstrap existing schema");
1482
1483        assert!(
1484            report
1485                .applied_versions
1486                .iter()
1487                .any(|version| version.0 == 10),
1488            "rebootstrap should re-record migration 10"
1489        );
1490        assert!(
1491            report
1492                .applied_versions
1493                .iter()
1494                .any(|version| version.0 == 11),
1495            "rebootstrap should re-record migration 11"
1496        );
1497        let filter_fields_json: String = conn
1498            .query_row(
1499                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1500                [],
1501                |row| row.get(0),
1502            )
1503            .unwrap_or_else(|_| "[]".to_string());
1504        assert_eq!(filter_fields_json, "[]");
1505        let validation_json: String = conn
1506            .query_row(
1507                "SELECT validation_json FROM operational_collections LIMIT 1",
1508                [],
1509                |row| row.get(0),
1510            )
1511            .unwrap_or_default();
1512        assert_eq!(validation_json, "");
1513    }
1514
1515    #[test]
1516    fn downgrade_detected_returns_version_mismatch() {
1517        use crate::SchemaError;
1518
1519        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1520        let manager = SchemaManager::new();
1521        manager.bootstrap(&conn).expect("initial bootstrap");
1522
1523        conn.execute(
1524            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1525            (999_i64, "future migration"),
1526        )
1527        .expect("insert future version");
1528
1529        let err = manager
1530            .bootstrap(&conn)
1531            .expect_err("should fail on downgrade");
1532        assert!(
1533            matches!(
1534                err,
1535                SchemaError::VersionMismatch {
1536                    database_version: 999,
1537                    ..
1538                }
1539            ),
1540            "expected VersionMismatch with database_version 999, got: {err}"
1541        );
1542    }
1543
1544    #[test]
1545    fn journal_size_limit_is_set() {
1546        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1547        let manager = SchemaManager::new();
1548        manager
1549            .initialize_connection(&conn)
1550            .expect("initialize_connection");
1551
1552        let limit: i64 = conn
1553            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1554            .expect("journal_size_limit pragma");
1555        assert_eq!(limit, 536_870_912);
1556    }
1557
1558    #[cfg(feature = "sqlite-vec")]
1559    #[test]
1560    fn vector_profile_created_when_feature_enabled() {
1561        // Register the sqlite-vec extension globally so the in-memory
1562        // connection can use the vec0 module.
1563        unsafe {
1564            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1565                sqlite_vec::sqlite3_vec_init as *const (),
1566            )));
1567        }
1568        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1569        let manager = SchemaManager::new();
1570        manager.bootstrap(&conn).expect("bootstrap");
1571
1572        manager
1573            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1574            .expect("ensure_vector_profile");
1575
1576        let count: i64 = conn
1577            .query_row(
1578                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1579                [],
1580                |row| row.get(0),
1581            )
1582            .expect("count");
1583        assert_eq!(
1584            count, 1,
1585            "vector profile must be enabled after ensure_vector_profile"
1586        );
1587
1588        // Verify the virtual table exists by querying it
1589        let _: i64 = conn
1590            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1591                row.get(0)
1592            })
1593            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1594    }
1595}