Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6    Migration::new(
7        SchemaVersion(1),
8        "initial canonical schema and runtime tables",
9        r"
10                CREATE TABLE IF NOT EXISTS nodes (
11                    row_id TEXT PRIMARY KEY,
12                    logical_id TEXT NOT NULL,
13                    kind TEXT NOT NULL,
14                    properties BLOB NOT NULL,
15                    created_at INTEGER NOT NULL,
16                    superseded_at INTEGER,
17                    source_ref TEXT,
18                    confidence REAL
19                );
20
21                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22                    ON nodes(logical_id)
23                    WHERE superseded_at IS NULL;
24                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25                    ON nodes(kind, superseded_at);
26                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27                    ON nodes(source_ref);
28
29                CREATE TABLE IF NOT EXISTS edges (
30                    row_id TEXT PRIMARY KEY,
31                    logical_id TEXT NOT NULL,
32                    source_logical_id TEXT NOT NULL,
33                    target_logical_id TEXT NOT NULL,
34                    kind TEXT NOT NULL,
35                    properties BLOB NOT NULL,
36                    created_at INTEGER NOT NULL,
37                    superseded_at INTEGER,
38                    source_ref TEXT,
39                    confidence REAL
40                );
41
42                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43                    ON edges(logical_id)
44                    WHERE superseded_at IS NULL;
45                CREATE INDEX IF NOT EXISTS idx_edges_source_active
46                    ON edges(source_logical_id, kind, superseded_at);
47                CREATE INDEX IF NOT EXISTS idx_edges_target_active
48                    ON edges(target_logical_id, kind, superseded_at);
49                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50                    ON edges(source_ref);
51
52                CREATE TABLE IF NOT EXISTS chunks (
53                    id TEXT PRIMARY KEY,
54                    node_logical_id TEXT NOT NULL,
55                    text_content TEXT NOT NULL,
56                    byte_start INTEGER,
57                    byte_end INTEGER,
58                    created_at INTEGER NOT NULL
59                );
60
61                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62                    ON chunks(node_logical_id);
63
64                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65                    chunk_id UNINDEXED,
66                    node_logical_id UNINDEXED,
67                    kind UNINDEXED,
68                    text_content
69                );
70
71                CREATE TABLE IF NOT EXISTS vector_profiles (
72                    profile TEXT PRIMARY KEY,
73                    table_name TEXT NOT NULL,
74                    dimension INTEGER NOT NULL,
75                    enabled INTEGER NOT NULL DEFAULT 0
76                );
77
78                CREATE TABLE IF NOT EXISTS runs (
79                    id TEXT PRIMARY KEY,
80                    kind TEXT NOT NULL,
81                    status TEXT NOT NULL,
82                    properties BLOB NOT NULL,
83                    created_at INTEGER NOT NULL,
84                    completed_at INTEGER,
85                    superseded_at INTEGER,
86                    source_ref TEXT
87                );
88
89                CREATE TABLE IF NOT EXISTS steps (
90                    id TEXT PRIMARY KEY,
91                    run_id TEXT NOT NULL,
92                    kind TEXT NOT NULL,
93                    status TEXT NOT NULL,
94                    properties BLOB NOT NULL,
95                    created_at INTEGER NOT NULL,
96                    completed_at INTEGER,
97                    superseded_at INTEGER,
98                    source_ref TEXT,
99                    FOREIGN KEY(run_id) REFERENCES runs(id)
100                );
101
102                CREATE TABLE IF NOT EXISTS actions (
103                    id TEXT PRIMARY KEY,
104                    step_id TEXT NOT NULL,
105                    kind TEXT NOT NULL,
106                    status TEXT NOT NULL,
107                    properties BLOB NOT NULL,
108                    created_at INTEGER NOT NULL,
109                    completed_at INTEGER,
110                    superseded_at INTEGER,
111                    source_ref TEXT,
112                    FOREIGN KEY(step_id) REFERENCES steps(id)
113                );
114
115                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116                    ON runs(source_ref);
117                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118                    ON steps(source_ref);
119                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120                    ON actions(source_ref);
121                ",
122    ),
123    Migration::new(
124        SchemaVersion(2),
125        "durable audit trail: provenance_events table",
126        r"
127                CREATE TABLE IF NOT EXISTS provenance_events (
128                    id         TEXT PRIMARY KEY,
129                    event_type TEXT NOT NULL,
130                    subject    TEXT NOT NULL,
131                    source_ref TEXT,
132                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
133                );
134                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135                    ON provenance_events (subject, event_type);
136                ",
137    ),
138    Migration::new(
139        SchemaVersion(3),
140        "vector regeneration contracts",
141        r"
142                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143                    profile TEXT PRIMARY KEY,
144                    table_name TEXT NOT NULL,
145                    model_identity TEXT NOT NULL,
146                    model_version TEXT NOT NULL,
147                    dimension INTEGER NOT NULL,
148                    normalization_policy TEXT NOT NULL,
149                    chunking_policy TEXT NOT NULL,
150                    preprocessing_policy TEXT NOT NULL,
151                    generator_command_json TEXT NOT NULL,
152                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153                );
154                ",
155    ),
156    Migration::new(
157        SchemaVersion(4),
158        "vector regeneration apply metadata",
159        r"
160                ALTER TABLE vector_embedding_contracts
161                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162                ALTER TABLE vector_embedding_contracts
163                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164                UPDATE vector_embedding_contracts
165                SET
166                    applied_at = CASE
167                        WHEN applied_at = 0 THEN updated_at
168                        ELSE applied_at
169                    END,
170                    snapshot_hash = CASE
171                        WHEN snapshot_hash = '' THEN 'legacy'
172                        ELSE snapshot_hash
173                    END;
174                ",
175    ),
176    Migration::new(
177        SchemaVersion(5),
178        "vector regeneration contract format version",
179        r"
180                ALTER TABLE vector_embedding_contracts
181                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182                UPDATE vector_embedding_contracts
183                SET contract_format_version = 1
184                WHERE contract_format_version = 0;
185                ",
186    ),
187    Migration::new(
188        SchemaVersion(6),
189        "provenance metadata payloads",
190        r"
191                ALTER TABLE provenance_events
192                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193                ",
194    ),
195    Migration::new(
196        SchemaVersion(7),
197        "operational store canonical and derived tables",
198        r"
199                CREATE TABLE IF NOT EXISTS operational_collections (
200                    name TEXT PRIMARY KEY,
201                    kind TEXT NOT NULL,
202                    schema_json TEXT NOT NULL,
203                    retention_json TEXT NOT NULL,
204                    format_version INTEGER NOT NULL DEFAULT 1,
205                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206                    disabled_at INTEGER
207                );
208
209                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210                    ON operational_collections(kind, disabled_at);
211
212                CREATE TABLE IF NOT EXISTS operational_mutations (
213                    id TEXT PRIMARY KEY,
214                    collection_name TEXT NOT NULL,
215                    record_key TEXT NOT NULL,
216                    op_kind TEXT NOT NULL,
217                    payload_json TEXT NOT NULL,
218                    source_ref TEXT,
219                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221                );
222
223                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226                    ON operational_mutations(source_ref);
227
228                CREATE TABLE IF NOT EXISTS operational_current (
229                    collection_name TEXT NOT NULL,
230                    record_key TEXT NOT NULL,
231                    payload_json TEXT NOT NULL,
232                    updated_at INTEGER NOT NULL,
233                    last_mutation_id TEXT NOT NULL,
234                    PRIMARY KEY(collection_name, record_key),
235                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237                );
238
239                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240                    ON operational_current(collection_name, updated_at DESC);
241                ",
242    ),
243    Migration::new(
244        SchemaVersion(8),
245        "operational mutation ordering hardening",
246        r"
247                ALTER TABLE operational_mutations
248                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249                UPDATE operational_mutations
250                SET mutation_order = rowid
251                WHERE mutation_order = 0;
252                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
254                ",
255    ),
256    Migration::new(
257        SchemaVersion(9),
258        "node last_accessed metadata",
259        r"
260                CREATE TABLE IF NOT EXISTS node_access_metadata (
261                    logical_id TEXT PRIMARY KEY,
262                    last_accessed_at INTEGER NOT NULL,
263                    updated_at INTEGER NOT NULL
264                );
265
266                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267                    ON node_access_metadata(last_accessed_at DESC);
268                ",
269    ),
270    Migration::new(
271        SchemaVersion(10),
272        "operational filtered read contracts and extracted values",
273        r"
274                ALTER TABLE operational_collections
275                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277                CREATE TABLE IF NOT EXISTS operational_filter_values (
278                    mutation_id TEXT NOT NULL,
279                    collection_name TEXT NOT NULL,
280                    field_name TEXT NOT NULL,
281                    string_value TEXT,
282                    integer_value INTEGER,
283                    PRIMARY KEY(mutation_id, field_name),
284                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286                );
287
288                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292                ",
293    ),
294    Migration::new(
295        SchemaVersion(11),
296        "operational payload validation contracts",
297        r"
298                ALTER TABLE operational_collections
299                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300                ",
301    ),
302    Migration::new(
303        SchemaVersion(12),
304        "operational secondary indexes",
305        r"
306                ALTER TABLE operational_collections
307                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310                    collection_name TEXT NOT NULL,
311                    index_name TEXT NOT NULL,
312                    subject_kind TEXT NOT NULL,
313                    mutation_id TEXT NOT NULL DEFAULT '',
314                    record_key TEXT NOT NULL DEFAULT '',
315                    sort_timestamp INTEGER,
316                    slot1_text TEXT,
317                    slot1_integer INTEGER,
318                    slot2_text TEXT,
319                    slot2_integer INTEGER,
320                    slot3_text TEXT,
321                    slot3_integer INTEGER,
322                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325                );
326
327                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328                    ON operational_secondary_index_entries(
329                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330                    );
331                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332                    ON operational_secondary_index_entries(
333                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334                    );
335                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336                    ON operational_secondary_index_entries(
337                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338                    );
339                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340                    ON operational_secondary_index_entries(
341                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342                );
343                ",
344    ),
345    Migration::new(
346        SchemaVersion(13),
347        "operational retention run metadata",
348        r"
349                CREATE TABLE IF NOT EXISTS operational_retention_runs (
350                    id TEXT PRIMARY KEY,
351                    collection_name TEXT NOT NULL,
352                    executed_at INTEGER NOT NULL,
353                    action_kind TEXT NOT NULL,
354                    dry_run INTEGER NOT NULL DEFAULT 0,
355                    deleted_mutations INTEGER NOT NULL,
356                    rows_remaining INTEGER NOT NULL,
357                    metadata_json TEXT NOT NULL DEFAULT '',
358                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359                );
360
361                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362                    ON operational_retention_runs(collection_name, executed_at DESC);
363                ",
364    ),
365    Migration::new(
366        SchemaVersion(14),
367        "external content object columns",
368        r"
369                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372                    ON nodes(content_ref)
373                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376                ",
377    ),
378    Migration::new(
379        SchemaVersion(15),
380        "FTS property projection schemas",
381        r"
382                CREATE TABLE IF NOT EXISTS fts_property_schemas (
383                    kind TEXT PRIMARY KEY,
384                    property_paths_json TEXT NOT NULL,
385                    separator TEXT NOT NULL DEFAULT ' ',
386                    format_version INTEGER NOT NULL DEFAULT 1,
387                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
388                );
389
390                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391                    node_logical_id UNINDEXED,
392                    kind UNINDEXED,
393                    text_content
394                );
395                ",
396    ),
397    Migration::new(
398        SchemaVersion(16),
399        "rebuild fts_nodes and fts_node_properties on porter+unicode61 tokenizer",
400        // DDL applied by `ensure_unicode_porter_fts_tokenizers`; the hook
401        // drops, recreates, and rebuilds both tables from canonical state.
402        // The FTS5 chained tokenizer syntax requires the wrapper (porter)
403        // before the base tokenizer, so the applied `tokenize=` clause is
404        // `'porter unicode61 remove_diacritics 2'`.
405        "",
406    ),
407    Migration::new(
408        SchemaVersion(17),
409        "fts property position-map sidecar for recursive extraction",
410        // Sidecar position map for property FTS. The recursive extraction
411        // walk in the engine emits one row per scalar leaf contributing to a
412        // given node's property FTS blob, carrying the half-open byte range
413        // `[start_offset, end_offset)` within the blob and the JSON-path of
414        // the originating leaf. Phase 5 uses this to attribute tokens back
415        // to their source leaves.
416        //
417        // The existing `fts_property_schemas.property_paths_json` column is
418        // reused unchanged at the DDL level; the engine-side JSON decoder
419        // tolerates both the legacy shape (bare strings = scalar) and the
420        // new shape (objects with `mode` = `scalar`|`recursive`, optional
421        // `exclude_paths`). Backwards compatibility is guaranteed because a
422        // bare JSON array of strings still deserialises into scalar-mode
423        // entries.
424        r"
425                CREATE TABLE IF NOT EXISTS fts_node_property_positions (
426                    node_logical_id TEXT NOT NULL,
427                    kind TEXT NOT NULL,
428                    start_offset INTEGER NOT NULL,
429                    end_offset INTEGER NOT NULL,
430                    leaf_path TEXT NOT NULL
431                );
432
433                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
434                    ON fts_node_property_positions(node_logical_id, kind);
435
436                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
437                    ON fts_node_property_positions(kind);
438                ",
439    ),
440    Migration::new(
441        SchemaVersion(18),
442        "add UNIQUE constraint on fts_node_property_positions (node_logical_id, kind, start_offset)",
443        // P4-P2-4: the v17 sidecar DDL did not enforce uniqueness of the
444        // `(node_logical_id, kind, start_offset)` tuple, so a buggy rebuild
445        // path could silently double-insert a leaf and break attribution
446        // lookups. Drop and recreate the table with the UNIQUE constraint,
447        // preserving the existing indexes. The DDL leaves the table empty,
448        // which the open-time rebuild guard in `ExecutionCoordinator::open`
449        // detects (empty positions + recursive schemas present) and
450        // repopulates from canonical state on the next open. The rebuild
451        // path is idempotent and safe to run unconditionally.
452        //
453        // `fts_node_property_positions` is a regular SQLite table, not an
454        // FTS5 virtual table, so UNIQUE constraints are supported.
455        r"
456                DROP TABLE IF EXISTS fts_node_property_positions;
457
458                CREATE TABLE fts_node_property_positions (
459                    node_logical_id TEXT NOT NULL,
460                    kind TEXT NOT NULL,
461                    start_offset INTEGER NOT NULL,
462                    end_offset INTEGER NOT NULL,
463                    leaf_path TEXT NOT NULL,
464                    UNIQUE(node_logical_id, kind, start_offset)
465                );
466
467                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_node
468                    ON fts_node_property_positions(node_logical_id, kind);
469
470                CREATE INDEX IF NOT EXISTS idx_fts_node_property_positions_kind
471                    ON fts_node_property_positions(kind);
472                ",
473    ),
474];
475
476#[derive(Clone, Debug, PartialEq, Eq)]
477pub struct BootstrapReport {
478    pub sqlite_version: String,
479    pub applied_versions: Vec<SchemaVersion>,
480    pub vector_profile_enabled: bool,
481}
482
483#[derive(Clone, Debug, Default)]
484pub struct SchemaManager;
485
486impl SchemaManager {
487    #[must_use]
488    pub fn new() -> Self {
489        Self
490    }
491
492    /// Bootstrap the database schema, applying any pending migrations.
493    ///
494    /// # Errors
495    ///
496    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
497    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
498    /// to a version newer than this engine supports.
499    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
500        self.initialize_connection(conn)?;
501        Self::ensure_metadata_tables(conn)?;
502
503        // Downgrade protection
504        let max_applied: u32 = conn.query_row(
505            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
506            [],
507            |row| row.get(0),
508        )?;
509        let engine_version = self.current_version().0;
510        trace_info!(
511            current_version = max_applied,
512            engine_version,
513            "schema bootstrap: version check"
514        );
515        if max_applied > engine_version {
516            trace_error!(
517                database_version = max_applied,
518                engine_version,
519                "schema version mismatch: database is newer than engine"
520            );
521            return Err(SchemaError::VersionMismatch {
522                database_version: max_applied,
523                engine_version,
524            });
525        }
526
527        let mut applied_versions = Vec::new();
528        for migration in self.migrations() {
529            let already_applied = conn
530                .query_row(
531                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
532                    [i64::from(migration.version.0)],
533                    |row| row.get::<_, i64>(0),
534                )
535                .optional()?
536                .is_some();
537
538            if already_applied {
539                continue;
540            }
541
542            let tx = conn.unchecked_transaction()?;
543            match migration.version {
544                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
545                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
546                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
547                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
548                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
549                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
550                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
551                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
552                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
553                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
554                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
555                SchemaVersion(16) => Self::ensure_unicode_porter_fts_tokenizers(&tx)?,
556                _ => tx.execute_batch(migration.sql)?,
557            }
558            tx.execute(
559                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
560                (i64::from(migration.version.0), migration.description),
561            )?;
562            tx.commit()?;
563            trace_info!(
564                version = migration.version.0,
565                description = migration.description,
566                "schema migration applied"
567            );
568            applied_versions.push(migration.version);
569        }
570
571        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
572        let vector_profile_count: i64 = conn.query_row(
573            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
574            [],
575            |row| row.get(0),
576        )?;
577        Ok(BootstrapReport {
578            sqlite_version,
579            applied_versions,
580            vector_profile_enabled: vector_profile_count > 0,
581        })
582    }
583
584    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
585        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
586        let columns = stmt
587            .query_map([], |row| row.get::<_, String>(1))?
588            .collect::<Result<Vec<_>, _>>()?;
589        let has_applied_at = columns.iter().any(|column| column == "applied_at");
590        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
591
592        if !has_applied_at {
593            conn.execute(
594                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
595                [],
596            )?;
597        }
598        if !has_snapshot_hash {
599            conn.execute(
600                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
601                [],
602            )?;
603        }
604        conn.execute(
605            r"
606            UPDATE vector_embedding_contracts
607            SET
608                applied_at = CASE
609                    WHEN applied_at = 0 THEN updated_at
610                    ELSE applied_at
611                END,
612                snapshot_hash = CASE
613                    WHEN snapshot_hash = '' THEN 'legacy'
614                    ELSE snapshot_hash
615                END
616            ",
617            [],
618        )?;
619        Ok(())
620    }
621
622    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
623        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
624        let columns = stmt
625            .query_map([], |row| row.get::<_, String>(1))?
626            .collect::<Result<Vec<_>, _>>()?;
627        let has_contract_format_version = columns
628            .iter()
629            .any(|column| column == "contract_format_version");
630
631        if !has_contract_format_version {
632            conn.execute(
633                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
634                [],
635            )?;
636        }
637        conn.execute(
638            r"
639            UPDATE vector_embedding_contracts
640            SET contract_format_version = 1
641            WHERE contract_format_version = 0
642            ",
643            [],
644        )?;
645        Ok(())
646    }
647
648    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
649        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
650        let columns = stmt
651            .query_map([], |row| row.get::<_, String>(1))?
652            .collect::<Result<Vec<_>, _>>()?;
653        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
654
655        if !has_metadata_json {
656            conn.execute(
657                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
658                [],
659            )?;
660        }
661        Ok(())
662    }
663
664    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
665        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
666        let columns = stmt
667            .query_map([], |row| row.get::<_, String>(1))?
668            .collect::<Result<Vec<_>, _>>()?;
669        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
670
671        if !has_mutation_order {
672            conn.execute(
673                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
674                [],
675            )?;
676        }
677        conn.execute(
678            r"
679            UPDATE operational_mutations
680            SET mutation_order = rowid
681            WHERE mutation_order = 0
682            ",
683            [],
684        )?;
685        conn.execute(
686            r"
687            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
688                ON operational_mutations(collection_name, record_key, mutation_order DESC)
689            ",
690            [],
691        )?;
692        Ok(())
693    }
694
695    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
696        conn.execute_batch(
697            r"
698            CREATE TABLE IF NOT EXISTS node_access_metadata (
699                logical_id TEXT PRIMARY KEY,
700                last_accessed_at INTEGER NOT NULL,
701                updated_at INTEGER NOT NULL
702            );
703
704            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
705                ON node_access_metadata(last_accessed_at DESC);
706            ",
707        )?;
708        Ok(())
709    }
710
711    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
712        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
713        let columns = stmt
714            .query_map([], |row| row.get::<_, String>(1))?
715            .collect::<Result<Vec<_>, _>>()?;
716        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
717
718        if !has_filter_fields_json {
719            conn.execute(
720                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
721                [],
722            )?;
723        }
724
725        conn.execute_batch(
726            r"
727            CREATE TABLE IF NOT EXISTS operational_filter_values (
728                mutation_id TEXT NOT NULL,
729                collection_name TEXT NOT NULL,
730                field_name TEXT NOT NULL,
731                string_value TEXT,
732                integer_value INTEGER,
733                PRIMARY KEY(mutation_id, field_name),
734                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
735                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
736            );
737
738            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
739                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
740            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
741                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
742            ",
743        )?;
744        Ok(())
745    }
746
747    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
748        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
749        let columns = stmt
750            .query_map([], |row| row.get::<_, String>(1))?
751            .collect::<Result<Vec<_>, _>>()?;
752        let has_validation_json = columns.iter().any(|column| column == "validation_json");
753
754        if !has_validation_json {
755            conn.execute(
756                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
757                [],
758            )?;
759        }
760
761        Ok(())
762    }
763
764    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
765        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
766        let columns = stmt
767            .query_map([], |row| row.get::<_, String>(1))?
768            .collect::<Result<Vec<_>, _>>()?;
769        let has_secondary_indexes_json = columns
770            .iter()
771            .any(|column| column == "secondary_indexes_json");
772
773        if !has_secondary_indexes_json {
774            conn.execute(
775                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
776                [],
777            )?;
778        }
779
780        conn.execute_batch(
781            r"
782            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
783                collection_name TEXT NOT NULL,
784                index_name TEXT NOT NULL,
785                subject_kind TEXT NOT NULL,
786                mutation_id TEXT NOT NULL DEFAULT '',
787                record_key TEXT NOT NULL DEFAULT '',
788                sort_timestamp INTEGER,
789                slot1_text TEXT,
790                slot1_integer INTEGER,
791                slot2_text TEXT,
792                slot2_integer INTEGER,
793                slot3_text TEXT,
794                slot3_integer INTEGER,
795                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
796                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
797                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
798            );
799
800            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
801                ON operational_secondary_index_entries(
802                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
803                );
804            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
805                ON operational_secondary_index_entries(
806                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
807                );
808            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
809                ON operational_secondary_index_entries(
810                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
811                );
812            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
813                ON operational_secondary_index_entries(
814                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
815                );
816            ",
817        )?;
818
819        Ok(())
820    }
821
822    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
823        conn.execute_batch(
824            r"
825            CREATE TABLE IF NOT EXISTS operational_retention_runs (
826                id TEXT PRIMARY KEY,
827                collection_name TEXT NOT NULL,
828                executed_at INTEGER NOT NULL,
829                action_kind TEXT NOT NULL,
830                dry_run INTEGER NOT NULL DEFAULT 0,
831                deleted_mutations INTEGER NOT NULL,
832                rows_remaining INTEGER NOT NULL,
833                metadata_json TEXT NOT NULL DEFAULT '',
834                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
835            );
836
837            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
838                ON operational_retention_runs(collection_name, executed_at DESC);
839            ",
840        )?;
841        Ok(())
842    }
843
844    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
845        let node_columns = Self::column_names(conn, "nodes")?;
846        if !node_columns.iter().any(|c| c == "content_ref") {
847            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
848        }
849        conn.execute_batch(
850            r"
851            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
852                ON nodes(content_ref)
853                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
854            ",
855        )?;
856
857        let chunk_columns = Self::column_names(conn, "chunks")?;
858        if !chunk_columns.iter().any(|c| c == "content_hash") {
859            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
860        }
861        Ok(())
862    }
863
864    /// Migration 16: migrate both `fts_nodes` and `fts_node_properties` from
865    /// the default FTS5 simple tokenizer to `unicode61 remove_diacritics 2
866    /// porter` so diacritic-insensitive and stem-aware matches work (e.g.
867    /// `cafe` matching `café`, `shipping` matching `ship`).
868    ///
869    /// FTS5 does not support re-tokenizing an existing index in place, so
870    /// both virtual tables are dropped and recreated with the new
871    /// `tokenize=...` clause. `fts_nodes` is rebuilt inline from the
872    /// canonical `chunks + nodes` join. `fts_node_properties` is left empty
873    /// here and repopulated from canonical state by the engine runtime after
874    /// bootstrap (the property FTS rebuild requires the per-kind
875    /// `fts_property_schemas` projection that lives in the engine crate).
876    ///
877    /// A malformed row encountered during the inline `INSERT ... SELECT`
878    /// causes the migration to abort: the rusqlite error propagates up
879    /// through `execute_batch` and rolls back the outer migration
880    /// transaction so the schema version is not advanced.
881    fn ensure_unicode_porter_fts_tokenizers(conn: &Connection) -> Result<(), SchemaError> {
882        conn.execute_batch(
883            r"
884            DROP TABLE IF EXISTS fts_nodes;
885            CREATE VIRTUAL TABLE fts_nodes USING fts5(
886                chunk_id UNINDEXED,
887                node_logical_id UNINDEXED,
888                kind UNINDEXED,
889                text_content,
890                tokenize = 'porter unicode61 remove_diacritics 2'
891            );
892
893            DROP TABLE IF EXISTS fts_node_properties;
894            CREATE VIRTUAL TABLE fts_node_properties USING fts5(
895                node_logical_id UNINDEXED,
896                kind UNINDEXED,
897                text_content,
898                tokenize = 'porter unicode61 remove_diacritics 2'
899            );
900
901            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
902            SELECT c.id, n.logical_id, n.kind, c.text_content
903            FROM chunks c
904            JOIN nodes n
905              ON n.logical_id = c.node_logical_id
906             AND n.superseded_at IS NULL;
907            ",
908        )?;
909        Ok(())
910    }
911
912    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
913        conn.execute_batch(
914            r"
915            CREATE TABLE IF NOT EXISTS fts_property_schemas (
916                kind TEXT PRIMARY KEY,
917                property_paths_json TEXT NOT NULL,
918                separator TEXT NOT NULL DEFAULT ' ',
919                format_version INTEGER NOT NULL DEFAULT 1,
920                created_at INTEGER NOT NULL DEFAULT (unixepoch())
921            );
922
923            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
924                node_logical_id UNINDEXED,
925                kind UNINDEXED,
926                text_content
927            );
928            ",
929        )?;
930        Ok(())
931    }
932
933    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
934        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
935        let names = stmt
936            .query_map([], |row| row.get::<_, String>(1))?
937            .collect::<Result<Vec<_>, _>>()?;
938        Ok(names)
939    }
940
941    #[must_use]
942    pub fn current_version(&self) -> SchemaVersion {
943        self.migrations()
944            .last()
945            .map_or(SchemaVersion(0), |migration| migration.version)
946    }
947
948    #[must_use]
949    pub fn migrations(&self) -> &'static [Migration] {
950        MIGRATIONS
951    }
952
953    /// Set the recommended `SQLite` connection pragmas for fathomdb.
954    ///
955    /// # Errors
956    ///
957    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
958    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
959        conn.execute_batch(
960            r"
961            PRAGMA foreign_keys = ON;
962            PRAGMA journal_mode = WAL;
963            PRAGMA synchronous = NORMAL;
964            PRAGMA busy_timeout = 5000;
965            PRAGMA temp_store = MEMORY;
966            PRAGMA mmap_size = 3000000000;
967            PRAGMA journal_size_limit = 536870912;
968            ",
969        )?;
970        Ok(())
971    }
972
973    /// Initialize a **read-only** connection with PRAGMAs that are safe for
974    /// readers.
975    ///
976    /// Skips `journal_mode` (requires write; the writer already set WAL),
977    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
978    /// (requires write).
979    ///
980    /// # Errors
981    ///
982    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
983    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
984        conn.execute_batch(
985            r"
986            PRAGMA foreign_keys = ON;
987            PRAGMA busy_timeout = 5000;
988            PRAGMA temp_store = MEMORY;
989            PRAGMA mmap_size = 3000000000;
990            ",
991        )?;
992        Ok(())
993    }
994
995    /// Ensure the sqlite-vec vector extension profile is registered and the
996    /// virtual vec table exists.
997    ///
998    /// When the `sqlite-vec` feature is enabled this creates the virtual table
999    /// and records the profile in `vector_profiles` (with `enabled = 1`).
1000    /// When the feature is absent the call always returns
1001    /// [`SchemaError::MissingCapability`].
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
1006    #[cfg(feature = "sqlite-vec")]
1007    pub fn ensure_vector_profile(
1008        &self,
1009        conn: &Connection,
1010        profile: &str,
1011        table_name: &str,
1012        dimension: usize,
1013    ) -> Result<(), SchemaError> {
1014        conn.execute_batch(&format!(
1015            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1016                chunk_id TEXT PRIMARY KEY,\
1017                embedding float[{dimension}]\
1018            )"
1019        ))?;
1020        // Vector dimensions are small positive integers (typically <= a few
1021        // thousand); convert explicitly so clippy's cast_possible_wrap is happy.
1022        let dimension_i64 = i64::try_from(dimension).map_err(|_| {
1023            SchemaError::Sqlite(rusqlite::Error::ToSqlConversionFailure(
1024                format!("vector dimension {dimension} does not fit in i64").into(),
1025            ))
1026        })?;
1027        conn.execute(
1028            "INSERT OR REPLACE INTO vector_profiles \
1029             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
1030            rusqlite::params![profile, table_name, dimension_i64],
1031        )?;
1032        Ok(())
1033    }
1034
1035    /// # Errors
1036    ///
1037    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
1038    /// feature is not compiled in.
1039    #[cfg(not(feature = "sqlite-vec"))]
1040    pub fn ensure_vector_profile(
1041        &self,
1042        _conn: &Connection,
1043        _profile: &str,
1044        _table_name: &str,
1045        _dimension: usize,
1046    ) -> Result<(), SchemaError> {
1047        Err(SchemaError::MissingCapability("sqlite-vec"))
1048    }
1049
1050    /// Create the internal migration-tracking table if it does not exist.
1051    ///
1052    /// # Errors
1053    ///
1054    /// Returns [`SchemaError`] if the DDL fails to execute.
1055    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
1056        conn.execute_batch(
1057            r"
1058            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
1059                version INTEGER PRIMARY KEY,
1060                description TEXT NOT NULL,
1061                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
1062            );
1063            ",
1064        )?;
1065        Ok(())
1066    }
1067}
1068
1069#[cfg(test)]
1070#[allow(clippy::expect_used)]
1071mod tests {
1072    use rusqlite::Connection;
1073
1074    use super::SchemaManager;
1075
1076    #[test]
1077    fn bootstrap_applies_initial_schema() {
1078        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1079        let manager = SchemaManager::new();
1080
1081        let report = manager.bootstrap(&conn).expect("bootstrap report");
1082
1083        assert_eq!(
1084            report.applied_versions.len(),
1085            manager.current_version().0 as usize
1086        );
1087        assert!(report.sqlite_version.starts_with('3'));
1088        let table_count: i64 = conn
1089            .query_row(
1090                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
1091                [],
1092                |row| row.get(0),
1093            )
1094            .expect("nodes table exists");
1095        assert_eq!(table_count, 1);
1096    }
1097
1098    // --- Item 2: vector profile tests ---
1099
1100    #[test]
1101    fn vector_profile_not_enabled_without_feature() {
1102        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1103        let manager = SchemaManager::new();
1104        let report = manager.bootstrap(&conn).expect("bootstrap");
1105        assert!(
1106            !report.vector_profile_enabled,
1107            "vector_profile_enabled must be false on a fresh bootstrap"
1108        );
1109    }
1110
1111    #[test]
1112    fn vector_profile_skipped_when_dimension_absent() {
1113        // ensure_vector_profile is never called → enabled stays false
1114        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1115        let manager = SchemaManager::new();
1116        manager.bootstrap(&conn).expect("bootstrap");
1117
1118        let count: i64 = conn
1119            .query_row(
1120                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1121                [],
1122                |row| row.get(0),
1123            )
1124            .expect("count");
1125        assert_eq!(
1126            count, 0,
1127            "no enabled profile without calling ensure_vector_profile"
1128        );
1129    }
1130
1131    #[test]
1132    fn bootstrap_report_reflects_actual_vector_state() {
1133        // After a fresh bootstrap with no vector profile, the report reflects reality.
1134        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1135        let manager = SchemaManager::new();
1136        let report = manager.bootstrap(&conn).expect("bootstrap");
1137
1138        let db_count: i64 = conn
1139            .query_row(
1140                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1141                [],
1142                |row| row.get(0),
1143            )
1144            .expect("count");
1145        assert_eq!(
1146            report.vector_profile_enabled,
1147            db_count > 0,
1148            "BootstrapReport.vector_profile_enabled must match actual DB state"
1149        );
1150    }
1151
1152    #[test]
1153    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1154        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1155        conn.execute_batch(
1156            r#"
1157            CREATE TABLE provenance_events (
1158                id         TEXT PRIMARY KEY,
1159                event_type TEXT NOT NULL,
1160                subject    TEXT NOT NULL,
1161                source_ref TEXT,
1162                created_at INTEGER NOT NULL DEFAULT (unixepoch())
1163            );
1164            CREATE TABLE vector_embedding_contracts (
1165                profile TEXT PRIMARY KEY,
1166                table_name TEXT NOT NULL,
1167                model_identity TEXT NOT NULL,
1168                model_version TEXT NOT NULL,
1169                dimension INTEGER NOT NULL,
1170                normalization_policy TEXT NOT NULL,
1171                chunking_policy TEXT NOT NULL,
1172                preprocessing_policy TEXT NOT NULL,
1173                generator_command_json TEXT NOT NULL,
1174                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1175                applied_at INTEGER NOT NULL DEFAULT 0,
1176                snapshot_hash TEXT NOT NULL DEFAULT ''
1177            );
1178            INSERT INTO vector_embedding_contracts (
1179                profile,
1180                table_name,
1181                model_identity,
1182                model_version,
1183                dimension,
1184                normalization_policy,
1185                chunking_policy,
1186                preprocessing_policy,
1187                generator_command_json,
1188                updated_at,
1189                applied_at,
1190                snapshot_hash
1191            ) VALUES (
1192                'default',
1193                'vec_nodes_active',
1194                'legacy-model',
1195                '0.9.0',
1196                4,
1197                'l2',
1198                'per_chunk',
1199                'trim',
1200                '["/bin/echo"]',
1201                100,
1202                100,
1203                'legacy'
1204            );
1205            "#,
1206        )
1207        .expect("seed legacy schema");
1208        let manager = SchemaManager::new();
1209
1210        let report = manager.bootstrap(&conn).expect("bootstrap");
1211
1212        assert!(
1213            report.applied_versions.iter().any(|version| version.0 >= 5),
1214            "bootstrap should apply hardening migrations"
1215        );
1216        let format_version: i64 = conn
1217            .query_row(
1218                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1219                [],
1220                |row| row.get(0),
1221            )
1222            .expect("contract_format_version");
1223        assert_eq!(format_version, 1);
1224        let metadata_column_count: i64 = conn
1225            .query_row(
1226                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1227                [],
1228                |row| row.get(0),
1229            )
1230            .expect("metadata_json column count");
1231        assert_eq!(metadata_column_count, 1);
1232    }
1233
1234    #[test]
1235    fn bootstrap_creates_operational_store_tables() {
1236        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1237        let manager = SchemaManager::new();
1238
1239        manager.bootstrap(&conn).expect("bootstrap");
1240
1241        for table in [
1242            "operational_collections",
1243            "operational_mutations",
1244            "operational_current",
1245        ] {
1246            let count: i64 = conn
1247                .query_row(
1248                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1249                    [table],
1250                    |row| row.get(0),
1251                )
1252                .expect("table existence");
1253            assert_eq!(count, 1, "{table} should exist after bootstrap");
1254        }
1255        let mutation_order_columns: i64 = conn
1256            .query_row(
1257                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1258                [],
1259                |row| row.get(0),
1260            )
1261            .expect("mutation_order column exists");
1262        assert_eq!(mutation_order_columns, 1);
1263    }
1264
1265    #[test]
1266    fn bootstrap_is_idempotent_with_operational_store_tables() {
1267        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1268        let manager = SchemaManager::new();
1269
1270        manager.bootstrap(&conn).expect("first bootstrap");
1271        let report = manager.bootstrap(&conn).expect("second bootstrap");
1272
1273        assert!(
1274            report.applied_versions.is_empty(),
1275            "second bootstrap should apply no new migrations"
1276        );
1277        let count: i64 = conn
1278            .query_row(
1279                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1280                [],
1281                |row| row.get(0),
1282        )
1283        .expect("operational_collections table exists");
1284        assert_eq!(count, 1);
1285    }
1286
1287    #[test]
1288    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1289        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1290        conn.execute_batch(
1291            r#"
1292            CREATE TABLE operational_collections (
1293                name TEXT PRIMARY KEY,
1294                kind TEXT NOT NULL,
1295                schema_json TEXT NOT NULL,
1296                retention_json TEXT NOT NULL,
1297                format_version INTEGER NOT NULL DEFAULT 1,
1298                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1299                disabled_at INTEGER
1300            );
1301
1302            CREATE TABLE operational_mutations (
1303                id TEXT PRIMARY KEY,
1304                collection_name TEXT NOT NULL,
1305                record_key TEXT NOT NULL,
1306                op_kind TEXT NOT NULL,
1307                payload_json TEXT NOT NULL,
1308                source_ref TEXT,
1309                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1310                mutation_order INTEGER NOT NULL DEFAULT 0,
1311                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1312            );
1313
1314            CREATE TABLE operational_current (
1315                collection_name TEXT NOT NULL,
1316                record_key TEXT NOT NULL,
1317                payload_json TEXT NOT NULL,
1318                updated_at INTEGER NOT NULL,
1319                last_mutation_id TEXT NOT NULL,
1320                PRIMARY KEY(collection_name, record_key),
1321                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1322                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1323            );
1324
1325            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1326            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1327            INSERT INTO operational_mutations (
1328                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1329            ) VALUES (
1330                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1331            );
1332            "#,
1333        )
1334        .expect("seed recovered operational tables");
1335
1336        let manager = SchemaManager::new();
1337        let report = manager
1338            .bootstrap(&conn)
1339            .expect("bootstrap recovered schema");
1340
1341        assert!(
1342            report.applied_versions.iter().any(|version| version.0 == 8),
1343            "bootstrap should record operational mutation ordering hardening"
1344        );
1345        let mutation_order: i64 = conn
1346            .query_row(
1347                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1348                [],
1349                |row| row.get(0),
1350            )
1351            .expect("mutation_order");
1352        assert_ne!(
1353            mutation_order, 0,
1354            "bootstrap should backfill recovered operational rows"
1355        );
1356        let count: i64 = conn
1357            .query_row(
1358                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1359                [],
1360                |row| row.get(0),
1361            )
1362            .expect("ordering index exists");
1363        assert_eq!(count, 1);
1364    }
1365
1366    #[test]
1367    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1368        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1369        conn.execute_batch(
1370            r#"
1371            CREATE TABLE operational_collections (
1372                name TEXT PRIMARY KEY,
1373                kind TEXT NOT NULL,
1374                schema_json TEXT NOT NULL,
1375                retention_json TEXT NOT NULL,
1376                format_version INTEGER NOT NULL DEFAULT 1,
1377                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1378                disabled_at INTEGER
1379            );
1380
1381            CREATE TABLE operational_mutations (
1382                id TEXT PRIMARY KEY,
1383                collection_name TEXT NOT NULL,
1384                record_key TEXT NOT NULL,
1385                op_kind TEXT NOT NULL,
1386                payload_json TEXT NOT NULL,
1387                source_ref TEXT,
1388                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1389                mutation_order INTEGER NOT NULL DEFAULT 1,
1390                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1391            );
1392
1393            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1394            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1395            "#,
1396        )
1397        .expect("seed recovered operational schema");
1398
1399        let manager = SchemaManager::new();
1400        let report = manager
1401            .bootstrap(&conn)
1402            .expect("bootstrap recovered schema");
1403
1404        assert!(
1405            report
1406                .applied_versions
1407                .iter()
1408                .any(|version| version.0 == 10),
1409            "bootstrap should record operational filtered read migration"
1410        );
1411        assert!(
1412            report
1413                .applied_versions
1414                .iter()
1415                .any(|version| version.0 == 11),
1416            "bootstrap should record operational validation migration"
1417        );
1418        let filter_fields_json: String = conn
1419            .query_row(
1420                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1421                [],
1422                |row| row.get(0),
1423            )
1424            .expect("filter_fields_json added");
1425        assert_eq!(filter_fields_json, "[]");
1426        let validation_json: String = conn
1427            .query_row(
1428                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1429                [],
1430                |row| row.get(0),
1431            )
1432            .expect("validation_json added");
1433        assert_eq!(validation_json, "");
1434        let table_count: i64 = conn
1435            .query_row(
1436                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1437                [],
1438                |row| row.get(0),
1439        )
1440        .expect("filter table exists");
1441        assert_eq!(table_count, 1);
1442    }
1443
1444    #[test]
1445    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1446        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1447        let manager = SchemaManager::new();
1448        manager.bootstrap(&conn).expect("initial bootstrap");
1449
1450        conn.execute("DROP TABLE fathom_schema_migrations", [])
1451            .expect("drop migration history");
1452        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1453
1454        let report = manager
1455            .bootstrap(&conn)
1456            .expect("rebootstrap existing schema");
1457
1458        assert!(
1459            report
1460                .applied_versions
1461                .iter()
1462                .any(|version| version.0 == 10),
1463            "rebootstrap should re-record migration 10"
1464        );
1465        assert!(
1466            report
1467                .applied_versions
1468                .iter()
1469                .any(|version| version.0 == 11),
1470            "rebootstrap should re-record migration 11"
1471        );
1472        let filter_fields_json: String = conn
1473            .query_row(
1474                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1475                [],
1476                |row| row.get(0),
1477            )
1478            .unwrap_or_else(|_| "[]".to_string());
1479        assert_eq!(filter_fields_json, "[]");
1480        let validation_json: String = conn
1481            .query_row(
1482                "SELECT validation_json FROM operational_collections LIMIT 1",
1483                [],
1484                |row| row.get(0),
1485            )
1486            .unwrap_or_default();
1487        assert_eq!(validation_json, "");
1488    }
1489
1490    #[test]
1491    fn downgrade_detected_returns_version_mismatch() {
1492        use crate::SchemaError;
1493
1494        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1495        let manager = SchemaManager::new();
1496        manager.bootstrap(&conn).expect("initial bootstrap");
1497
1498        conn.execute(
1499            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1500            (999_i64, "future migration"),
1501        )
1502        .expect("insert future version");
1503
1504        let err = manager
1505            .bootstrap(&conn)
1506            .expect_err("should fail on downgrade");
1507        assert!(
1508            matches!(
1509                err,
1510                SchemaError::VersionMismatch {
1511                    database_version: 999,
1512                    ..
1513                }
1514            ),
1515            "expected VersionMismatch with database_version 999, got: {err}"
1516        );
1517    }
1518
1519    #[test]
1520    fn journal_size_limit_is_set() {
1521        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1522        let manager = SchemaManager::new();
1523        manager
1524            .initialize_connection(&conn)
1525            .expect("initialize_connection");
1526
1527        let limit: i64 = conn
1528            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1529            .expect("journal_size_limit pragma");
1530        assert_eq!(limit, 536_870_912);
1531    }
1532
1533    #[cfg(feature = "sqlite-vec")]
1534    #[test]
1535    fn vector_profile_created_when_feature_enabled() {
1536        // Register the sqlite-vec extension globally so the in-memory
1537        // connection can use the vec0 module.
1538        unsafe {
1539            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1540                sqlite_vec::sqlite3_vec_init as *const (),
1541            )));
1542        }
1543        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1544        let manager = SchemaManager::new();
1545        manager.bootstrap(&conn).expect("bootstrap");
1546
1547        manager
1548            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1549            .expect("ensure_vector_profile");
1550
1551        let count: i64 = conn
1552            .query_row(
1553                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1554                [],
1555                |row| row.get(0),
1556            )
1557            .expect("count");
1558        assert_eq!(
1559            count, 1,
1560            "vector profile must be enabled after ensure_vector_profile"
1561        );
1562
1563        // Verify the virtual table exists by querying it
1564        let _: i64 = conn
1565            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1566                row.get(0)
1567            })
1568            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1569    }
1570}