Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6    Migration::new(
7        SchemaVersion(1),
8        "initial canonical schema and runtime tables",
9        r"
10                CREATE TABLE IF NOT EXISTS nodes (
11                    row_id TEXT PRIMARY KEY,
12                    logical_id TEXT NOT NULL,
13                    kind TEXT NOT NULL,
14                    properties BLOB NOT NULL,
15                    created_at INTEGER NOT NULL,
16                    superseded_at INTEGER,
17                    source_ref TEXT,
18                    confidence REAL
19                );
20
21                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22                    ON nodes(logical_id)
23                    WHERE superseded_at IS NULL;
24                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25                    ON nodes(kind, superseded_at);
26                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27                    ON nodes(source_ref);
28
29                CREATE TABLE IF NOT EXISTS edges (
30                    row_id TEXT PRIMARY KEY,
31                    logical_id TEXT NOT NULL,
32                    source_logical_id TEXT NOT NULL,
33                    target_logical_id TEXT NOT NULL,
34                    kind TEXT NOT NULL,
35                    properties BLOB NOT NULL,
36                    created_at INTEGER NOT NULL,
37                    superseded_at INTEGER,
38                    source_ref TEXT,
39                    confidence REAL
40                );
41
42                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43                    ON edges(logical_id)
44                    WHERE superseded_at IS NULL;
45                CREATE INDEX IF NOT EXISTS idx_edges_source_active
46                    ON edges(source_logical_id, kind, superseded_at);
47                CREATE INDEX IF NOT EXISTS idx_edges_target_active
48                    ON edges(target_logical_id, kind, superseded_at);
49                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50                    ON edges(source_ref);
51
52                CREATE TABLE IF NOT EXISTS chunks (
53                    id TEXT PRIMARY KEY,
54                    node_logical_id TEXT NOT NULL,
55                    text_content TEXT NOT NULL,
56                    byte_start INTEGER,
57                    byte_end INTEGER,
58                    created_at INTEGER NOT NULL
59                );
60
61                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62                    ON chunks(node_logical_id);
63
64                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65                    chunk_id UNINDEXED,
66                    node_logical_id UNINDEXED,
67                    kind UNINDEXED,
68                    text_content
69                );
70
71                CREATE TABLE IF NOT EXISTS vector_profiles (
72                    profile TEXT PRIMARY KEY,
73                    table_name TEXT NOT NULL,
74                    dimension INTEGER NOT NULL,
75                    enabled INTEGER NOT NULL DEFAULT 0
76                );
77
78                CREATE TABLE IF NOT EXISTS runs (
79                    id TEXT PRIMARY KEY,
80                    kind TEXT NOT NULL,
81                    status TEXT NOT NULL,
82                    properties BLOB NOT NULL,
83                    created_at INTEGER NOT NULL,
84                    completed_at INTEGER,
85                    superseded_at INTEGER,
86                    source_ref TEXT
87                );
88
89                CREATE TABLE IF NOT EXISTS steps (
90                    id TEXT PRIMARY KEY,
91                    run_id TEXT NOT NULL,
92                    kind TEXT NOT NULL,
93                    status TEXT NOT NULL,
94                    properties BLOB NOT NULL,
95                    created_at INTEGER NOT NULL,
96                    completed_at INTEGER,
97                    superseded_at INTEGER,
98                    source_ref TEXT,
99                    FOREIGN KEY(run_id) REFERENCES runs(id)
100                );
101
102                CREATE TABLE IF NOT EXISTS actions (
103                    id TEXT PRIMARY KEY,
104                    step_id TEXT NOT NULL,
105                    kind TEXT NOT NULL,
106                    status TEXT NOT NULL,
107                    properties BLOB NOT NULL,
108                    created_at INTEGER NOT NULL,
109                    completed_at INTEGER,
110                    superseded_at INTEGER,
111                    source_ref TEXT,
112                    FOREIGN KEY(step_id) REFERENCES steps(id)
113                );
114
115                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116                    ON runs(source_ref);
117                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118                    ON steps(source_ref);
119                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120                    ON actions(source_ref);
121                ",
122    ),
123    Migration::new(
124        SchemaVersion(2),
125        "durable audit trail: provenance_events table",
126        r"
127                CREATE TABLE IF NOT EXISTS provenance_events (
128                    id         TEXT PRIMARY KEY,
129                    event_type TEXT NOT NULL,
130                    subject    TEXT NOT NULL,
131                    source_ref TEXT,
132                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
133                );
134                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135                    ON provenance_events (subject, event_type);
136                ",
137    ),
138    Migration::new(
139        SchemaVersion(3),
140        "vector regeneration contracts",
141        r"
142                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143                    profile TEXT PRIMARY KEY,
144                    table_name TEXT NOT NULL,
145                    model_identity TEXT NOT NULL,
146                    model_version TEXT NOT NULL,
147                    dimension INTEGER NOT NULL,
148                    normalization_policy TEXT NOT NULL,
149                    chunking_policy TEXT NOT NULL,
150                    preprocessing_policy TEXT NOT NULL,
151                    generator_command_json TEXT NOT NULL,
152                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153                );
154                ",
155    ),
156    Migration::new(
157        SchemaVersion(4),
158        "vector regeneration apply metadata",
159        r"
160                ALTER TABLE vector_embedding_contracts
161                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162                ALTER TABLE vector_embedding_contracts
163                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164                UPDATE vector_embedding_contracts
165                SET
166                    applied_at = CASE
167                        WHEN applied_at = 0 THEN updated_at
168                        ELSE applied_at
169                    END,
170                    snapshot_hash = CASE
171                        WHEN snapshot_hash = '' THEN 'legacy'
172                        ELSE snapshot_hash
173                    END;
174                ",
175    ),
176    Migration::new(
177        SchemaVersion(5),
178        "vector regeneration contract format version",
179        r"
180                ALTER TABLE vector_embedding_contracts
181                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182                UPDATE vector_embedding_contracts
183                SET contract_format_version = 1
184                WHERE contract_format_version = 0;
185                ",
186    ),
187    Migration::new(
188        SchemaVersion(6),
189        "provenance metadata payloads",
190        r"
191                ALTER TABLE provenance_events
192                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193                ",
194    ),
195    Migration::new(
196        SchemaVersion(7),
197        "operational store canonical and derived tables",
198        r"
199                CREATE TABLE IF NOT EXISTS operational_collections (
200                    name TEXT PRIMARY KEY,
201                    kind TEXT NOT NULL,
202                    schema_json TEXT NOT NULL,
203                    retention_json TEXT NOT NULL,
204                    format_version INTEGER NOT NULL DEFAULT 1,
205                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206                    disabled_at INTEGER
207                );
208
209                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210                    ON operational_collections(kind, disabled_at);
211
212                CREATE TABLE IF NOT EXISTS operational_mutations (
213                    id TEXT PRIMARY KEY,
214                    collection_name TEXT NOT NULL,
215                    record_key TEXT NOT NULL,
216                    op_kind TEXT NOT NULL,
217                    payload_json TEXT NOT NULL,
218                    source_ref TEXT,
219                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221                );
222
223                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226                    ON operational_mutations(source_ref);
227
228                CREATE TABLE IF NOT EXISTS operational_current (
229                    collection_name TEXT NOT NULL,
230                    record_key TEXT NOT NULL,
231                    payload_json TEXT NOT NULL,
232                    updated_at INTEGER NOT NULL,
233                    last_mutation_id TEXT NOT NULL,
234                    PRIMARY KEY(collection_name, record_key),
235                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237                );
238
239                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240                    ON operational_current(collection_name, updated_at DESC);
241                ",
242    ),
243    Migration::new(
244        SchemaVersion(8),
245        "operational mutation ordering hardening",
246        r"
247                ALTER TABLE operational_mutations
248                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249                UPDATE operational_mutations
250                SET mutation_order = rowid
251                WHERE mutation_order = 0;
252                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
254                ",
255    ),
256    Migration::new(
257        SchemaVersion(9),
258        "node last_accessed metadata",
259        r"
260                CREATE TABLE IF NOT EXISTS node_access_metadata (
261                    logical_id TEXT PRIMARY KEY,
262                    last_accessed_at INTEGER NOT NULL,
263                    updated_at INTEGER NOT NULL
264                );
265
266                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267                    ON node_access_metadata(last_accessed_at DESC);
268                ",
269    ),
270    Migration::new(
271        SchemaVersion(10),
272        "operational filtered read contracts and extracted values",
273        r"
274                ALTER TABLE operational_collections
275                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277                CREATE TABLE IF NOT EXISTS operational_filter_values (
278                    mutation_id TEXT NOT NULL,
279                    collection_name TEXT NOT NULL,
280                    field_name TEXT NOT NULL,
281                    string_value TEXT,
282                    integer_value INTEGER,
283                    PRIMARY KEY(mutation_id, field_name),
284                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286                );
287
288                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292                ",
293    ),
294    Migration::new(
295        SchemaVersion(11),
296        "operational payload validation contracts",
297        r"
298                ALTER TABLE operational_collections
299                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300                ",
301    ),
302    Migration::new(
303        SchemaVersion(12),
304        "operational secondary indexes",
305        r"
306                ALTER TABLE operational_collections
307                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310                    collection_name TEXT NOT NULL,
311                    index_name TEXT NOT NULL,
312                    subject_kind TEXT NOT NULL,
313                    mutation_id TEXT NOT NULL DEFAULT '',
314                    record_key TEXT NOT NULL DEFAULT '',
315                    sort_timestamp INTEGER,
316                    slot1_text TEXT,
317                    slot1_integer INTEGER,
318                    slot2_text TEXT,
319                    slot2_integer INTEGER,
320                    slot3_text TEXT,
321                    slot3_integer INTEGER,
322                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325                );
326
327                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328                    ON operational_secondary_index_entries(
329                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330                    );
331                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332                    ON operational_secondary_index_entries(
333                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334                    );
335                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336                    ON operational_secondary_index_entries(
337                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338                    );
339                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340                    ON operational_secondary_index_entries(
341                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342                );
343                ",
344    ),
345    Migration::new(
346        SchemaVersion(13),
347        "operational retention run metadata",
348        r"
349                CREATE TABLE IF NOT EXISTS operational_retention_runs (
350                    id TEXT PRIMARY KEY,
351                    collection_name TEXT NOT NULL,
352                    executed_at INTEGER NOT NULL,
353                    action_kind TEXT NOT NULL,
354                    dry_run INTEGER NOT NULL DEFAULT 0,
355                    deleted_mutations INTEGER NOT NULL,
356                    rows_remaining INTEGER NOT NULL,
357                    metadata_json TEXT NOT NULL DEFAULT '',
358                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359                );
360
361                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362                    ON operational_retention_runs(collection_name, executed_at DESC);
363                ",
364    ),
365    Migration::new(
366        SchemaVersion(14),
367        "external content object columns",
368        r"
369                ALTER TABLE nodes ADD COLUMN content_ref TEXT;
370
371                CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
372                    ON nodes(content_ref)
373                    WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
374
375                ALTER TABLE chunks ADD COLUMN content_hash TEXT;
376                ",
377    ),
378    Migration::new(
379        SchemaVersion(15),
380        "FTS property projection schemas",
381        r"
382                CREATE TABLE IF NOT EXISTS fts_property_schemas (
383                    kind TEXT PRIMARY KEY,
384                    property_paths_json TEXT NOT NULL,
385                    separator TEXT NOT NULL DEFAULT ' ',
386                    format_version INTEGER NOT NULL DEFAULT 1,
387                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
388                );
389
390                CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
391                    node_logical_id UNINDEXED,
392                    kind UNINDEXED,
393                    text_content
394                );
395                ",
396    ),
397];
398
399#[derive(Clone, Debug, PartialEq, Eq)]
400pub struct BootstrapReport {
401    pub sqlite_version: String,
402    pub applied_versions: Vec<SchemaVersion>,
403    pub vector_profile_enabled: bool,
404}
405
406#[derive(Clone, Debug, Default)]
407pub struct SchemaManager;
408
409impl SchemaManager {
410    #[must_use]
411    pub fn new() -> Self {
412        Self
413    }
414
415    /// Bootstrap the database schema, applying any pending migrations.
416    ///
417    /// # Errors
418    ///
419    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
420    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
421    /// to a version newer than this engine supports.
422    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
423        self.initialize_connection(conn)?;
424        Self::ensure_metadata_tables(conn)?;
425
426        // Downgrade protection
427        let max_applied: u32 = conn.query_row(
428            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
429            [],
430            |row| row.get(0),
431        )?;
432        let engine_version = self.current_version().0;
433        trace_info!(
434            current_version = max_applied,
435            engine_version,
436            "schema bootstrap: version check"
437        );
438        if max_applied > engine_version {
439            trace_error!(
440                database_version = max_applied,
441                engine_version,
442                "schema version mismatch: database is newer than engine"
443            );
444            return Err(SchemaError::VersionMismatch {
445                database_version: max_applied,
446                engine_version,
447            });
448        }
449
450        let mut applied_versions = Vec::new();
451        for migration in self.migrations() {
452            let already_applied = conn
453                .query_row(
454                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
455                    [i64::from(migration.version.0)],
456                    |row| row.get::<_, i64>(0),
457                )
458                .optional()?
459                .is_some();
460
461            if already_applied {
462                continue;
463            }
464
465            let tx = conn.unchecked_transaction()?;
466            match migration.version {
467                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
468                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
469                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
470                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
471                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
472                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
473                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
474                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
475                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
476                SchemaVersion(14) => Self::ensure_external_content_columns(&tx)?,
477                SchemaVersion(15) => Self::ensure_fts_property_schemas(&tx)?,
478                _ => tx.execute_batch(migration.sql)?,
479            }
480            tx.execute(
481                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
482                (i64::from(migration.version.0), migration.description),
483            )?;
484            tx.commit()?;
485            trace_info!(
486                version = migration.version.0,
487                description = migration.description,
488                "schema migration applied"
489            );
490            applied_versions.push(migration.version);
491        }
492
493        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
494        let vector_profile_count: i64 = conn.query_row(
495            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
496            [],
497            |row| row.get(0),
498        )?;
499        Ok(BootstrapReport {
500            sqlite_version,
501            applied_versions,
502            vector_profile_enabled: vector_profile_count > 0,
503        })
504    }
505
506    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
507        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
508        let columns = stmt
509            .query_map([], |row| row.get::<_, String>(1))?
510            .collect::<Result<Vec<_>, _>>()?;
511        let has_applied_at = columns.iter().any(|column| column == "applied_at");
512        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
513
514        if !has_applied_at {
515            conn.execute(
516                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
517                [],
518            )?;
519        }
520        if !has_snapshot_hash {
521            conn.execute(
522                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
523                [],
524            )?;
525        }
526        conn.execute(
527            r"
528            UPDATE vector_embedding_contracts
529            SET
530                applied_at = CASE
531                    WHEN applied_at = 0 THEN updated_at
532                    ELSE applied_at
533                END,
534                snapshot_hash = CASE
535                    WHEN snapshot_hash = '' THEN 'legacy'
536                    ELSE snapshot_hash
537                END
538            ",
539            [],
540        )?;
541        Ok(())
542    }
543
544    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
545        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
546        let columns = stmt
547            .query_map([], |row| row.get::<_, String>(1))?
548            .collect::<Result<Vec<_>, _>>()?;
549        let has_contract_format_version = columns
550            .iter()
551            .any(|column| column == "contract_format_version");
552
553        if !has_contract_format_version {
554            conn.execute(
555                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
556                [],
557            )?;
558        }
559        conn.execute(
560            r"
561            UPDATE vector_embedding_contracts
562            SET contract_format_version = 1
563            WHERE contract_format_version = 0
564            ",
565            [],
566        )?;
567        Ok(())
568    }
569
570    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
571        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
572        let columns = stmt
573            .query_map([], |row| row.get::<_, String>(1))?
574            .collect::<Result<Vec<_>, _>>()?;
575        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
576
577        if !has_metadata_json {
578            conn.execute(
579                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
580                [],
581            )?;
582        }
583        Ok(())
584    }
585
586    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
587        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
588        let columns = stmt
589            .query_map([], |row| row.get::<_, String>(1))?
590            .collect::<Result<Vec<_>, _>>()?;
591        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
592
593        if !has_mutation_order {
594            conn.execute(
595                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
596                [],
597            )?;
598        }
599        conn.execute(
600            r"
601            UPDATE operational_mutations
602            SET mutation_order = rowid
603            WHERE mutation_order = 0
604            ",
605            [],
606        )?;
607        conn.execute(
608            r"
609            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
610                ON operational_mutations(collection_name, record_key, mutation_order DESC)
611            ",
612            [],
613        )?;
614        Ok(())
615    }
616
617    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
618        conn.execute_batch(
619            r"
620            CREATE TABLE IF NOT EXISTS node_access_metadata (
621                logical_id TEXT PRIMARY KEY,
622                last_accessed_at INTEGER NOT NULL,
623                updated_at INTEGER NOT NULL
624            );
625
626            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
627                ON node_access_metadata(last_accessed_at DESC);
628            ",
629        )?;
630        Ok(())
631    }
632
633    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
634        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
635        let columns = stmt
636            .query_map([], |row| row.get::<_, String>(1))?
637            .collect::<Result<Vec<_>, _>>()?;
638        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
639
640        if !has_filter_fields_json {
641            conn.execute(
642                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
643                [],
644            )?;
645        }
646
647        conn.execute_batch(
648            r"
649            CREATE TABLE IF NOT EXISTS operational_filter_values (
650                mutation_id TEXT NOT NULL,
651                collection_name TEXT NOT NULL,
652                field_name TEXT NOT NULL,
653                string_value TEXT,
654                integer_value INTEGER,
655                PRIMARY KEY(mutation_id, field_name),
656                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
657                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
658            );
659
660            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
661                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
662            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
663                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
664            ",
665        )?;
666        Ok(())
667    }
668
669    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
670        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
671        let columns = stmt
672            .query_map([], |row| row.get::<_, String>(1))?
673            .collect::<Result<Vec<_>, _>>()?;
674        let has_validation_json = columns.iter().any(|column| column == "validation_json");
675
676        if !has_validation_json {
677            conn.execute(
678                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
679                [],
680            )?;
681        }
682
683        Ok(())
684    }
685
686    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
687        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
688        let columns = stmt
689            .query_map([], |row| row.get::<_, String>(1))?
690            .collect::<Result<Vec<_>, _>>()?;
691        let has_secondary_indexes_json = columns
692            .iter()
693            .any(|column| column == "secondary_indexes_json");
694
695        if !has_secondary_indexes_json {
696            conn.execute(
697                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
698                [],
699            )?;
700        }
701
702        conn.execute_batch(
703            r"
704            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
705                collection_name TEXT NOT NULL,
706                index_name TEXT NOT NULL,
707                subject_kind TEXT NOT NULL,
708                mutation_id TEXT NOT NULL DEFAULT '',
709                record_key TEXT NOT NULL DEFAULT '',
710                sort_timestamp INTEGER,
711                slot1_text TEXT,
712                slot1_integer INTEGER,
713                slot2_text TEXT,
714                slot2_integer INTEGER,
715                slot3_text TEXT,
716                slot3_integer INTEGER,
717                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
718                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
719                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
720            );
721
722            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
723                ON operational_secondary_index_entries(
724                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
725                );
726            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
727                ON operational_secondary_index_entries(
728                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
729                );
730            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
731                ON operational_secondary_index_entries(
732                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
733                );
734            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
735                ON operational_secondary_index_entries(
736                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
737                );
738            ",
739        )?;
740
741        Ok(())
742    }
743
744    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
745        conn.execute_batch(
746            r"
747            CREATE TABLE IF NOT EXISTS operational_retention_runs (
748                id TEXT PRIMARY KEY,
749                collection_name TEXT NOT NULL,
750                executed_at INTEGER NOT NULL,
751                action_kind TEXT NOT NULL,
752                dry_run INTEGER NOT NULL DEFAULT 0,
753                deleted_mutations INTEGER NOT NULL,
754                rows_remaining INTEGER NOT NULL,
755                metadata_json TEXT NOT NULL DEFAULT '',
756                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
757            );
758
759            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
760                ON operational_retention_runs(collection_name, executed_at DESC);
761            ",
762        )?;
763        Ok(())
764    }
765
766    fn ensure_external_content_columns(conn: &Connection) -> Result<(), SchemaError> {
767        let node_columns = Self::column_names(conn, "nodes")?;
768        if !node_columns.iter().any(|c| c == "content_ref") {
769            conn.execute("ALTER TABLE nodes ADD COLUMN content_ref TEXT", [])?;
770        }
771        conn.execute_batch(
772            r"
773            CREATE INDEX IF NOT EXISTS idx_nodes_content_ref
774                ON nodes(content_ref)
775                WHERE content_ref IS NOT NULL AND superseded_at IS NULL;
776            ",
777        )?;
778
779        let chunk_columns = Self::column_names(conn, "chunks")?;
780        if !chunk_columns.iter().any(|c| c == "content_hash") {
781            conn.execute("ALTER TABLE chunks ADD COLUMN content_hash TEXT", [])?;
782        }
783        Ok(())
784    }
785
786    fn ensure_fts_property_schemas(conn: &Connection) -> Result<(), SchemaError> {
787        conn.execute_batch(
788            r"
789            CREATE TABLE IF NOT EXISTS fts_property_schemas (
790                kind TEXT PRIMARY KEY,
791                property_paths_json TEXT NOT NULL,
792                separator TEXT NOT NULL DEFAULT ' ',
793                format_version INTEGER NOT NULL DEFAULT 1,
794                created_at INTEGER NOT NULL DEFAULT (unixepoch())
795            );
796
797            CREATE VIRTUAL TABLE IF NOT EXISTS fts_node_properties USING fts5(
798                node_logical_id UNINDEXED,
799                kind UNINDEXED,
800                text_content
801            );
802            ",
803        )?;
804        Ok(())
805    }
806
807    fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>, SchemaError> {
808        let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
809        let names = stmt
810            .query_map([], |row| row.get::<_, String>(1))?
811            .collect::<Result<Vec<_>, _>>()?;
812        Ok(names)
813    }
814
815    #[must_use]
816    pub fn current_version(&self) -> SchemaVersion {
817        self.migrations()
818            .last()
819            .map_or(SchemaVersion(0), |migration| migration.version)
820    }
821
822    #[must_use]
823    pub fn migrations(&self) -> &'static [Migration] {
824        MIGRATIONS
825    }
826
827    /// Set the recommended `SQLite` connection pragmas for fathomdb.
828    ///
829    /// # Errors
830    ///
831    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
832    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
833        conn.execute_batch(
834            r"
835            PRAGMA foreign_keys = ON;
836            PRAGMA journal_mode = WAL;
837            PRAGMA synchronous = NORMAL;
838            PRAGMA busy_timeout = 5000;
839            PRAGMA temp_store = MEMORY;
840            PRAGMA mmap_size = 3000000000;
841            PRAGMA journal_size_limit = 536870912;
842            ",
843        )?;
844        Ok(())
845    }
846
847    /// Initialize a **read-only** connection with PRAGMAs that are safe for
848    /// readers.
849    ///
850    /// Skips `journal_mode` (requires write; the writer already set WAL),
851    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
852    /// (requires write).
853    ///
854    /// # Errors
855    ///
856    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
857    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
858        conn.execute_batch(
859            r"
860            PRAGMA foreign_keys = ON;
861            PRAGMA busy_timeout = 5000;
862            PRAGMA temp_store = MEMORY;
863            PRAGMA mmap_size = 3000000000;
864            ",
865        )?;
866        Ok(())
867    }
868
869    /// Ensure the sqlite-vec vector extension profile is registered and the
870    /// virtual vec table exists.
871    ///
872    /// When the `sqlite-vec` feature is enabled this creates the virtual table
873    /// and records the profile in `vector_profiles` (with `enabled = 1`).
874    /// When the feature is absent the call always returns
875    /// [`SchemaError::MissingCapability`].
876    ///
877    /// # Errors
878    ///
879    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
880    #[cfg(feature = "sqlite-vec")]
881    pub fn ensure_vector_profile(
882        &self,
883        conn: &Connection,
884        profile: &str,
885        table_name: &str,
886        dimension: usize,
887    ) -> Result<(), SchemaError> {
888        conn.execute_batch(&format!(
889            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
890                chunk_id TEXT PRIMARY KEY,\
891                embedding float[{dimension}]\
892            )"
893        ))?;
894        conn.execute(
895            "INSERT OR REPLACE INTO vector_profiles \
896             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
897            rusqlite::params![profile, table_name, dimension as i64],
898        )?;
899        Ok(())
900    }
901
902    /// # Errors
903    ///
904    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
905    /// feature is not compiled in.
906    #[cfg(not(feature = "sqlite-vec"))]
907    pub fn ensure_vector_profile(
908        &self,
909        _conn: &Connection,
910        _profile: &str,
911        _table_name: &str,
912        _dimension: usize,
913    ) -> Result<(), SchemaError> {
914        Err(SchemaError::MissingCapability("sqlite-vec"))
915    }
916
917    /// Create the internal migration-tracking table if it does not exist.
918    ///
919    /// # Errors
920    ///
921    /// Returns [`SchemaError`] if the DDL fails to execute.
922    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
923        conn.execute_batch(
924            r"
925            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
926                version INTEGER PRIMARY KEY,
927                description TEXT NOT NULL,
928                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
929            );
930            ",
931        )?;
932        Ok(())
933    }
934}
935
936#[cfg(test)]
937#[allow(clippy::expect_used)]
938mod tests {
939    use rusqlite::Connection;
940
941    use super::SchemaManager;
942
943    #[test]
944    fn bootstrap_applies_initial_schema() {
945        let conn = Connection::open_in_memory().expect("in-memory sqlite");
946        let manager = SchemaManager::new();
947
948        let report = manager.bootstrap(&conn).expect("bootstrap report");
949
950        assert_eq!(
951            report.applied_versions.len(),
952            manager.current_version().0 as usize
953        );
954        assert!(report.sqlite_version.starts_with('3'));
955        let table_count: i64 = conn
956            .query_row(
957                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
958                [],
959                |row| row.get(0),
960            )
961            .expect("nodes table exists");
962        assert_eq!(table_count, 1);
963    }
964
965    // --- Item 2: vector profile tests ---
966
967    #[test]
968    fn vector_profile_not_enabled_without_feature() {
969        let conn = Connection::open_in_memory().expect("in-memory sqlite");
970        let manager = SchemaManager::new();
971        let report = manager.bootstrap(&conn).expect("bootstrap");
972        assert!(
973            !report.vector_profile_enabled,
974            "vector_profile_enabled must be false on a fresh bootstrap"
975        );
976    }
977
978    #[test]
979    fn vector_profile_skipped_when_dimension_absent() {
980        // ensure_vector_profile is never called → enabled stays false
981        let conn = Connection::open_in_memory().expect("in-memory sqlite");
982        let manager = SchemaManager::new();
983        manager.bootstrap(&conn).expect("bootstrap");
984
985        let count: i64 = conn
986            .query_row(
987                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
988                [],
989                |row| row.get(0),
990            )
991            .expect("count");
992        assert_eq!(
993            count, 0,
994            "no enabled profile without calling ensure_vector_profile"
995        );
996    }
997
998    #[test]
999    fn bootstrap_report_reflects_actual_vector_state() {
1000        // After a fresh bootstrap with no vector profile, the report reflects reality.
1001        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1002        let manager = SchemaManager::new();
1003        let report = manager.bootstrap(&conn).expect("bootstrap");
1004
1005        let db_count: i64 = conn
1006            .query_row(
1007                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1008                [],
1009                |row| row.get(0),
1010            )
1011            .expect("count");
1012        assert_eq!(
1013            report.vector_profile_enabled,
1014            db_count > 0,
1015            "BootstrapReport.vector_profile_enabled must match actual DB state"
1016        );
1017    }
1018
1019    #[test]
1020    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
1021        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1022        conn.execute_batch(
1023            r#"
1024            CREATE TABLE provenance_events (
1025                id         TEXT PRIMARY KEY,
1026                event_type TEXT NOT NULL,
1027                subject    TEXT NOT NULL,
1028                source_ref TEXT,
1029                created_at INTEGER NOT NULL DEFAULT (unixepoch())
1030            );
1031            CREATE TABLE vector_embedding_contracts (
1032                profile TEXT PRIMARY KEY,
1033                table_name TEXT NOT NULL,
1034                model_identity TEXT NOT NULL,
1035                model_version TEXT NOT NULL,
1036                dimension INTEGER NOT NULL,
1037                normalization_policy TEXT NOT NULL,
1038                chunking_policy TEXT NOT NULL,
1039                preprocessing_policy TEXT NOT NULL,
1040                generator_command_json TEXT NOT NULL,
1041                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1042                applied_at INTEGER NOT NULL DEFAULT 0,
1043                snapshot_hash TEXT NOT NULL DEFAULT ''
1044            );
1045            INSERT INTO vector_embedding_contracts (
1046                profile,
1047                table_name,
1048                model_identity,
1049                model_version,
1050                dimension,
1051                normalization_policy,
1052                chunking_policy,
1053                preprocessing_policy,
1054                generator_command_json,
1055                updated_at,
1056                applied_at,
1057                snapshot_hash
1058            ) VALUES (
1059                'default',
1060                'vec_nodes_active',
1061                'legacy-model',
1062                '0.9.0',
1063                4,
1064                'l2',
1065                'per_chunk',
1066                'trim',
1067                '["/bin/echo"]',
1068                100,
1069                100,
1070                'legacy'
1071            );
1072            "#,
1073        )
1074        .expect("seed legacy schema");
1075        let manager = SchemaManager::new();
1076
1077        let report = manager.bootstrap(&conn).expect("bootstrap");
1078
1079        assert!(
1080            report.applied_versions.iter().any(|version| version.0 >= 5),
1081            "bootstrap should apply hardening migrations"
1082        );
1083        let format_version: i64 = conn
1084            .query_row(
1085                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1086                [],
1087                |row| row.get(0),
1088            )
1089            .expect("contract_format_version");
1090        assert_eq!(format_version, 1);
1091        let metadata_column_count: i64 = conn
1092            .query_row(
1093                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1094                [],
1095                |row| row.get(0),
1096            )
1097            .expect("metadata_json column count");
1098        assert_eq!(metadata_column_count, 1);
1099    }
1100
1101    #[test]
1102    fn bootstrap_creates_operational_store_tables() {
1103        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1104        let manager = SchemaManager::new();
1105
1106        manager.bootstrap(&conn).expect("bootstrap");
1107
1108        for table in [
1109            "operational_collections",
1110            "operational_mutations",
1111            "operational_current",
1112        ] {
1113            let count: i64 = conn
1114                .query_row(
1115                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1116                    [table],
1117                    |row| row.get(0),
1118                )
1119                .expect("table existence");
1120            assert_eq!(count, 1, "{table} should exist after bootstrap");
1121        }
1122        let mutation_order_columns: i64 = conn
1123            .query_row(
1124                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1125                [],
1126                |row| row.get(0),
1127            )
1128            .expect("mutation_order column exists");
1129        assert_eq!(mutation_order_columns, 1);
1130    }
1131
1132    #[test]
1133    fn bootstrap_is_idempotent_with_operational_store_tables() {
1134        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1135        let manager = SchemaManager::new();
1136
1137        manager.bootstrap(&conn).expect("first bootstrap");
1138        let report = manager.bootstrap(&conn).expect("second bootstrap");
1139
1140        assert!(
1141            report.applied_versions.is_empty(),
1142            "second bootstrap should apply no new migrations"
1143        );
1144        let count: i64 = conn
1145            .query_row(
1146                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1147                [],
1148                |row| row.get(0),
1149        )
1150        .expect("operational_collections table exists");
1151        assert_eq!(count, 1);
1152    }
1153
1154    #[test]
1155    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1156        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1157        conn.execute_batch(
1158            r#"
1159            CREATE TABLE operational_collections (
1160                name TEXT PRIMARY KEY,
1161                kind TEXT NOT NULL,
1162                schema_json TEXT NOT NULL,
1163                retention_json TEXT NOT NULL,
1164                format_version INTEGER NOT NULL DEFAULT 1,
1165                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1166                disabled_at INTEGER
1167            );
1168
1169            CREATE TABLE operational_mutations (
1170                id TEXT PRIMARY KEY,
1171                collection_name TEXT NOT NULL,
1172                record_key TEXT NOT NULL,
1173                op_kind TEXT NOT NULL,
1174                payload_json TEXT NOT NULL,
1175                source_ref TEXT,
1176                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1177                mutation_order INTEGER NOT NULL DEFAULT 0,
1178                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1179            );
1180
1181            CREATE TABLE operational_current (
1182                collection_name TEXT NOT NULL,
1183                record_key TEXT NOT NULL,
1184                payload_json TEXT NOT NULL,
1185                updated_at INTEGER NOT NULL,
1186                last_mutation_id TEXT NOT NULL,
1187                PRIMARY KEY(collection_name, record_key),
1188                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1189                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1190            );
1191
1192            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1193            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1194            INSERT INTO operational_mutations (
1195                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1196            ) VALUES (
1197                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1198            );
1199            "#,
1200        )
1201        .expect("seed recovered operational tables");
1202
1203        let manager = SchemaManager::new();
1204        let report = manager
1205            .bootstrap(&conn)
1206            .expect("bootstrap recovered schema");
1207
1208        assert!(
1209            report.applied_versions.iter().any(|version| version.0 == 8),
1210            "bootstrap should record operational mutation ordering hardening"
1211        );
1212        let mutation_order: i64 = conn
1213            .query_row(
1214                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1215                [],
1216                |row| row.get(0),
1217            )
1218            .expect("mutation_order");
1219        assert_ne!(
1220            mutation_order, 0,
1221            "bootstrap should backfill recovered operational rows"
1222        );
1223        let count: i64 = conn
1224            .query_row(
1225                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1226                [],
1227                |row| row.get(0),
1228            )
1229            .expect("ordering index exists");
1230        assert_eq!(count, 1);
1231    }
1232
1233    #[test]
1234    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1235        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1236        conn.execute_batch(
1237            r#"
1238            CREATE TABLE operational_collections (
1239                name TEXT PRIMARY KEY,
1240                kind TEXT NOT NULL,
1241                schema_json TEXT NOT NULL,
1242                retention_json TEXT NOT NULL,
1243                format_version INTEGER NOT NULL DEFAULT 1,
1244                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1245                disabled_at INTEGER
1246            );
1247
1248            CREATE TABLE operational_mutations (
1249                id TEXT PRIMARY KEY,
1250                collection_name TEXT NOT NULL,
1251                record_key TEXT NOT NULL,
1252                op_kind TEXT NOT NULL,
1253                payload_json TEXT NOT NULL,
1254                source_ref TEXT,
1255                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1256                mutation_order INTEGER NOT NULL DEFAULT 1,
1257                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1258            );
1259
1260            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1261            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1262            "#,
1263        )
1264        .expect("seed recovered operational schema");
1265
1266        let manager = SchemaManager::new();
1267        let report = manager
1268            .bootstrap(&conn)
1269            .expect("bootstrap recovered schema");
1270
1271        assert!(
1272            report
1273                .applied_versions
1274                .iter()
1275                .any(|version| version.0 == 10),
1276            "bootstrap should record operational filtered read migration"
1277        );
1278        assert!(
1279            report
1280                .applied_versions
1281                .iter()
1282                .any(|version| version.0 == 11),
1283            "bootstrap should record operational validation migration"
1284        );
1285        let filter_fields_json: String = conn
1286            .query_row(
1287                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1288                [],
1289                |row| row.get(0),
1290            )
1291            .expect("filter_fields_json added");
1292        assert_eq!(filter_fields_json, "[]");
1293        let validation_json: String = conn
1294            .query_row(
1295                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1296                [],
1297                |row| row.get(0),
1298            )
1299            .expect("validation_json added");
1300        assert_eq!(validation_json, "");
1301        let table_count: i64 = conn
1302            .query_row(
1303                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1304                [],
1305                |row| row.get(0),
1306        )
1307        .expect("filter table exists");
1308        assert_eq!(table_count, 1);
1309    }
1310
1311    #[test]
1312    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1313        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1314        let manager = SchemaManager::new();
1315        manager.bootstrap(&conn).expect("initial bootstrap");
1316
1317        conn.execute("DROP TABLE fathom_schema_migrations", [])
1318            .expect("drop migration history");
1319        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1320
1321        let report = manager
1322            .bootstrap(&conn)
1323            .expect("rebootstrap existing schema");
1324
1325        assert!(
1326            report
1327                .applied_versions
1328                .iter()
1329                .any(|version| version.0 == 10),
1330            "rebootstrap should re-record migration 10"
1331        );
1332        assert!(
1333            report
1334                .applied_versions
1335                .iter()
1336                .any(|version| version.0 == 11),
1337            "rebootstrap should re-record migration 11"
1338        );
1339        let filter_fields_json: String = conn
1340            .query_row(
1341                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1342                [],
1343                |row| row.get(0),
1344            )
1345            .unwrap_or_else(|_| "[]".to_string());
1346        assert_eq!(filter_fields_json, "[]");
1347        let validation_json: String = conn
1348            .query_row(
1349                "SELECT validation_json FROM operational_collections LIMIT 1",
1350                [],
1351                |row| row.get(0),
1352            )
1353            .unwrap_or_default();
1354        assert_eq!(validation_json, "");
1355    }
1356
1357    #[test]
1358    fn downgrade_detected_returns_version_mismatch() {
1359        use crate::SchemaError;
1360
1361        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1362        let manager = SchemaManager::new();
1363        manager.bootstrap(&conn).expect("initial bootstrap");
1364
1365        conn.execute(
1366            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1367            (999_i64, "future migration"),
1368        )
1369        .expect("insert future version");
1370
1371        let err = manager
1372            .bootstrap(&conn)
1373            .expect_err("should fail on downgrade");
1374        assert!(
1375            matches!(
1376                err,
1377                SchemaError::VersionMismatch {
1378                    database_version: 999,
1379                    ..
1380                }
1381            ),
1382            "expected VersionMismatch with database_version 999, got: {err}"
1383        );
1384    }
1385
1386    #[test]
1387    fn journal_size_limit_is_set() {
1388        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1389        let manager = SchemaManager::new();
1390        manager
1391            .initialize_connection(&conn)
1392            .expect("initialize_connection");
1393
1394        let limit: i64 = conn
1395            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1396            .expect("journal_size_limit pragma");
1397        assert_eq!(limit, 536_870_912);
1398    }
1399
1400    #[cfg(feature = "sqlite-vec")]
1401    #[test]
1402    fn vector_profile_created_when_feature_enabled() {
1403        // Register the sqlite-vec extension globally so the in-memory
1404        // connection can use the vec0 module.
1405        unsafe {
1406            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1407                sqlite_vec::sqlite3_vec_init as *const (),
1408            )));
1409        }
1410        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1411        let manager = SchemaManager::new();
1412        manager.bootstrap(&conn).expect("bootstrap");
1413
1414        manager
1415            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1416            .expect("ensure_vector_profile");
1417
1418        let count: i64 = conn
1419            .query_row(
1420                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1421                [],
1422                |row| row.get(0),
1423            )
1424            .expect("count");
1425        assert_eq!(
1426            count, 1,
1427            "vector profile must be enabled after ensure_vector_profile"
1428        );
1429
1430        // Verify the virtual table exists by querying it
1431        let _: i64 = conn
1432            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1433                row.get(0)
1434            })
1435            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1436    }
1437}