Skip to main content

fathomdb_schema/
bootstrap.rs

1use rusqlite::{Connection, OptionalExtension};
2
3use crate::{Migration, SchemaError, SchemaVersion};
4
5static MIGRATIONS: &[Migration] = &[
6    Migration::new(
7        SchemaVersion(1),
8        "initial canonical schema and runtime tables",
9        r"
10                CREATE TABLE IF NOT EXISTS nodes (
11                    row_id TEXT PRIMARY KEY,
12                    logical_id TEXT NOT NULL,
13                    kind TEXT NOT NULL,
14                    properties BLOB NOT NULL,
15                    created_at INTEGER NOT NULL,
16                    superseded_at INTEGER,
17                    source_ref TEXT,
18                    confidence REAL
19                );
20
21                CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_active_logical_id
22                    ON nodes(logical_id)
23                    WHERE superseded_at IS NULL;
24                CREATE INDEX IF NOT EXISTS idx_nodes_kind_active
25                    ON nodes(kind, superseded_at);
26                CREATE INDEX IF NOT EXISTS idx_nodes_source_ref
27                    ON nodes(source_ref);
28
29                CREATE TABLE IF NOT EXISTS edges (
30                    row_id TEXT PRIMARY KEY,
31                    logical_id TEXT NOT NULL,
32                    source_logical_id TEXT NOT NULL,
33                    target_logical_id TEXT NOT NULL,
34                    kind TEXT NOT NULL,
35                    properties BLOB NOT NULL,
36                    created_at INTEGER NOT NULL,
37                    superseded_at INTEGER,
38                    source_ref TEXT,
39                    confidence REAL
40                );
41
42                CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_active_logical_id
43                    ON edges(logical_id)
44                    WHERE superseded_at IS NULL;
45                CREATE INDEX IF NOT EXISTS idx_edges_source_active
46                    ON edges(source_logical_id, kind, superseded_at);
47                CREATE INDEX IF NOT EXISTS idx_edges_target_active
48                    ON edges(target_logical_id, kind, superseded_at);
49                CREATE INDEX IF NOT EXISTS idx_edges_source_ref
50                    ON edges(source_ref);
51
52                CREATE TABLE IF NOT EXISTS chunks (
53                    id TEXT PRIMARY KEY,
54                    node_logical_id TEXT NOT NULL,
55                    text_content TEXT NOT NULL,
56                    byte_start INTEGER,
57                    byte_end INTEGER,
58                    created_at INTEGER NOT NULL
59                );
60
61                CREATE INDEX IF NOT EXISTS idx_chunks_node_logical_id
62                    ON chunks(node_logical_id);
63
64                CREATE VIRTUAL TABLE IF NOT EXISTS fts_nodes USING fts5(
65                    chunk_id UNINDEXED,
66                    node_logical_id UNINDEXED,
67                    kind UNINDEXED,
68                    text_content
69                );
70
71                CREATE TABLE IF NOT EXISTS vector_profiles (
72                    profile TEXT PRIMARY KEY,
73                    table_name TEXT NOT NULL,
74                    dimension INTEGER NOT NULL,
75                    enabled INTEGER NOT NULL DEFAULT 0
76                );
77
78                CREATE TABLE IF NOT EXISTS runs (
79                    id TEXT PRIMARY KEY,
80                    kind TEXT NOT NULL,
81                    status TEXT NOT NULL,
82                    properties BLOB NOT NULL,
83                    created_at INTEGER NOT NULL,
84                    completed_at INTEGER,
85                    superseded_at INTEGER,
86                    source_ref TEXT
87                );
88
89                CREATE TABLE IF NOT EXISTS steps (
90                    id TEXT PRIMARY KEY,
91                    run_id TEXT NOT NULL,
92                    kind TEXT NOT NULL,
93                    status TEXT NOT NULL,
94                    properties BLOB NOT NULL,
95                    created_at INTEGER NOT NULL,
96                    completed_at INTEGER,
97                    superseded_at INTEGER,
98                    source_ref TEXT,
99                    FOREIGN KEY(run_id) REFERENCES runs(id)
100                );
101
102                CREATE TABLE IF NOT EXISTS actions (
103                    id TEXT PRIMARY KEY,
104                    step_id TEXT NOT NULL,
105                    kind TEXT NOT NULL,
106                    status TEXT NOT NULL,
107                    properties BLOB NOT NULL,
108                    created_at INTEGER NOT NULL,
109                    completed_at INTEGER,
110                    superseded_at INTEGER,
111                    source_ref TEXT,
112                    FOREIGN KEY(step_id) REFERENCES steps(id)
113                );
114
115                CREATE INDEX IF NOT EXISTS idx_runs_source_ref
116                    ON runs(source_ref);
117                CREATE INDEX IF NOT EXISTS idx_steps_source_ref
118                    ON steps(source_ref);
119                CREATE INDEX IF NOT EXISTS idx_actions_source_ref
120                    ON actions(source_ref);
121                ",
122    ),
123    Migration::new(
124        SchemaVersion(2),
125        "durable audit trail: provenance_events table",
126        r"
127                CREATE TABLE IF NOT EXISTS provenance_events (
128                    id         TEXT PRIMARY KEY,
129                    event_type TEXT NOT NULL,
130                    subject    TEXT NOT NULL,
131                    source_ref TEXT,
132                    created_at INTEGER NOT NULL DEFAULT (unixepoch())
133                );
134                CREATE INDEX IF NOT EXISTS idx_provenance_events_subject
135                    ON provenance_events (subject, event_type);
136                ",
137    ),
138    Migration::new(
139        SchemaVersion(3),
140        "vector regeneration contracts",
141        r"
142                CREATE TABLE IF NOT EXISTS vector_embedding_contracts (
143                    profile TEXT PRIMARY KEY,
144                    table_name TEXT NOT NULL,
145                    model_identity TEXT NOT NULL,
146                    model_version TEXT NOT NULL,
147                    dimension INTEGER NOT NULL,
148                    normalization_policy TEXT NOT NULL,
149                    chunking_policy TEXT NOT NULL,
150                    preprocessing_policy TEXT NOT NULL,
151                    generator_command_json TEXT NOT NULL,
152                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
153                );
154                ",
155    ),
156    Migration::new(
157        SchemaVersion(4),
158        "vector regeneration apply metadata",
159        r"
160                ALTER TABLE vector_embedding_contracts
161                    ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0;
162                ALTER TABLE vector_embedding_contracts
163                    ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT '';
164                UPDATE vector_embedding_contracts
165                SET
166                    applied_at = CASE
167                        WHEN applied_at = 0 THEN updated_at
168                        ELSE applied_at
169                    END,
170                    snapshot_hash = CASE
171                        WHEN snapshot_hash = '' THEN 'legacy'
172                        ELSE snapshot_hash
173                    END;
174                ",
175    ),
176    Migration::new(
177        SchemaVersion(5),
178        "vector regeneration contract format version",
179        r"
180                ALTER TABLE vector_embedding_contracts
181                    ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1;
182                UPDATE vector_embedding_contracts
183                SET contract_format_version = 1
184                WHERE contract_format_version = 0;
185                ",
186    ),
187    Migration::new(
188        SchemaVersion(6),
189        "provenance metadata payloads",
190        r"
191                ALTER TABLE provenance_events
192                    ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '';
193                ",
194    ),
195    Migration::new(
196        SchemaVersion(7),
197        "operational store canonical and derived tables",
198        r"
199                CREATE TABLE IF NOT EXISTS operational_collections (
200                    name TEXT PRIMARY KEY,
201                    kind TEXT NOT NULL,
202                    schema_json TEXT NOT NULL,
203                    retention_json TEXT NOT NULL,
204                    format_version INTEGER NOT NULL DEFAULT 1,
205                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
206                    disabled_at INTEGER
207                );
208
209                CREATE INDEX IF NOT EXISTS idx_operational_collections_kind
210                    ON operational_collections(kind, disabled_at);
211
212                CREATE TABLE IF NOT EXISTS operational_mutations (
213                    id TEXT PRIMARY KEY,
214                    collection_name TEXT NOT NULL,
215                    record_key TEXT NOT NULL,
216                    op_kind TEXT NOT NULL,
217                    payload_json TEXT NOT NULL,
218                    source_ref TEXT,
219                    created_at INTEGER NOT NULL DEFAULT (unixepoch()),
220                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
221                );
222
223                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_created
224                    ON operational_mutations(collection_name, record_key, created_at DESC, id DESC);
225                CREATE INDEX IF NOT EXISTS idx_operational_mutations_source_ref
226                    ON operational_mutations(source_ref);
227
228                CREATE TABLE IF NOT EXISTS operational_current (
229                    collection_name TEXT NOT NULL,
230                    record_key TEXT NOT NULL,
231                    payload_json TEXT NOT NULL,
232                    updated_at INTEGER NOT NULL,
233                    last_mutation_id TEXT NOT NULL,
234                    PRIMARY KEY(collection_name, record_key),
235                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
236                    FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
237                );
238
239                CREATE INDEX IF NOT EXISTS idx_operational_current_collection_updated
240                    ON operational_current(collection_name, updated_at DESC);
241                ",
242    ),
243    Migration::new(
244        SchemaVersion(8),
245        "operational mutation ordering hardening",
246        r"
247                ALTER TABLE operational_mutations
248                    ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0;
249                UPDATE operational_mutations
250                SET mutation_order = rowid
251                WHERE mutation_order = 0;
252                CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
253                    ON operational_mutations(collection_name, record_key, mutation_order DESC);
254                ",
255    ),
256    Migration::new(
257        SchemaVersion(9),
258        "node last_accessed metadata",
259        r"
260                CREATE TABLE IF NOT EXISTS node_access_metadata (
261                    logical_id TEXT PRIMARY KEY,
262                    last_accessed_at INTEGER NOT NULL,
263                    updated_at INTEGER NOT NULL
264                );
265
266                CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
267                    ON node_access_metadata(last_accessed_at DESC);
268                ",
269    ),
270    Migration::new(
271        SchemaVersion(10),
272        "operational filtered read contracts and extracted values",
273        r"
274                ALTER TABLE operational_collections
275                    ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]';
276
277                CREATE TABLE IF NOT EXISTS operational_filter_values (
278                    mutation_id TEXT NOT NULL,
279                    collection_name TEXT NOT NULL,
280                    field_name TEXT NOT NULL,
281                    string_value TEXT,
282                    integer_value INTEGER,
283                    PRIMARY KEY(mutation_id, field_name),
284                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
285                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
286                );
287
288                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
289                    ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
290                CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
291                    ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
292                ",
293    ),
294    Migration::new(
295        SchemaVersion(11),
296        "operational payload validation contracts",
297        r"
298                ALTER TABLE operational_collections
299                    ADD COLUMN validation_json TEXT NOT NULL DEFAULT '';
300                ",
301    ),
302    Migration::new(
303        SchemaVersion(12),
304        "operational secondary indexes",
305        r"
306                ALTER TABLE operational_collections
307                    ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]';
308
309                CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
310                    collection_name TEXT NOT NULL,
311                    index_name TEXT NOT NULL,
312                    subject_kind TEXT NOT NULL,
313                    mutation_id TEXT NOT NULL DEFAULT '',
314                    record_key TEXT NOT NULL DEFAULT '',
315                    sort_timestamp INTEGER,
316                    slot1_text TEXT,
317                    slot1_integer INTEGER,
318                    slot2_text TEXT,
319                    slot2_integer INTEGER,
320                    slot3_text TEXT,
321                    slot3_integer INTEGER,
322                    PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
323                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
324                    FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
325                );
326
327                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
328                    ON operational_secondary_index_entries(
329                        collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
330                    );
331                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
332                    ON operational_secondary_index_entries(
333                        collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
334                    );
335                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
336                    ON operational_secondary_index_entries(
337                        collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
338                    );
339                CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
340                    ON operational_secondary_index_entries(
341                        collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
342                );
343                ",
344    ),
345    Migration::new(
346        SchemaVersion(13),
347        "operational retention run metadata",
348        r"
349                CREATE TABLE IF NOT EXISTS operational_retention_runs (
350                    id TEXT PRIMARY KEY,
351                    collection_name TEXT NOT NULL,
352                    executed_at INTEGER NOT NULL,
353                    action_kind TEXT NOT NULL,
354                    dry_run INTEGER NOT NULL DEFAULT 0,
355                    deleted_mutations INTEGER NOT NULL,
356                    rows_remaining INTEGER NOT NULL,
357                    metadata_json TEXT NOT NULL DEFAULT '',
358                    FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
359                );
360
361                CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
362                    ON operational_retention_runs(collection_name, executed_at DESC);
363                ",
364    ),
365];
366
367#[derive(Clone, Debug, PartialEq, Eq)]
368pub struct BootstrapReport {
369    pub sqlite_version: String,
370    pub applied_versions: Vec<SchemaVersion>,
371    pub vector_profile_enabled: bool,
372}
373
374#[derive(Clone, Debug, Default)]
375pub struct SchemaManager;
376
377impl SchemaManager {
378    #[must_use]
379    pub fn new() -> Self {
380        Self
381    }
382
383    /// Bootstrap the database schema, applying any pending migrations.
384    ///
385    /// # Errors
386    ///
387    /// Returns [`SchemaError`] if any migration or metadata-table SQL fails,
388    /// or [`SchemaError::VersionMismatch`] if the database has been migrated
389    /// to a version newer than this engine supports.
390    pub fn bootstrap(&self, conn: &Connection) -> Result<BootstrapReport, SchemaError> {
391        self.initialize_connection(conn)?;
392        Self::ensure_metadata_tables(conn)?;
393
394        // Downgrade protection
395        let max_applied: u32 = conn.query_row(
396            "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
397            [],
398            |row| row.get(0),
399        )?;
400        let engine_version = self.current_version().0;
401        trace_info!(
402            current_version = max_applied,
403            engine_version,
404            "schema bootstrap: version check"
405        );
406        if max_applied > engine_version {
407            trace_error!(
408                database_version = max_applied,
409                engine_version,
410                "schema version mismatch: database is newer than engine"
411            );
412            return Err(SchemaError::VersionMismatch {
413                database_version: max_applied,
414                engine_version,
415            });
416        }
417
418        let mut applied_versions = Vec::new();
419        for migration in self.migrations() {
420            let already_applied = conn
421                .query_row(
422                    "SELECT 1 FROM fathom_schema_migrations WHERE version = ?1",
423                    [i64::from(migration.version.0)],
424                    |row| row.get::<_, i64>(0),
425                )
426                .optional()?
427                .is_some();
428
429            if already_applied {
430                continue;
431            }
432
433            let tx = conn.unchecked_transaction()?;
434            match migration.version {
435                SchemaVersion(4) => Self::ensure_vector_regeneration_apply_metadata(&tx)?,
436                SchemaVersion(5) => Self::ensure_vector_contract_format_version(&tx)?,
437                SchemaVersion(6) => Self::ensure_provenance_metadata(&tx)?,
438                SchemaVersion(8) => Self::ensure_operational_mutation_order(&tx)?,
439                SchemaVersion(9) => Self::ensure_node_access_metadata(&tx)?,
440                SchemaVersion(10) => Self::ensure_operational_filter_contract(&tx)?,
441                SchemaVersion(11) => Self::ensure_operational_validation_contract(&tx)?,
442                SchemaVersion(12) => Self::ensure_operational_secondary_indexes(&tx)?,
443                SchemaVersion(13) => Self::ensure_operational_retention_runs(&tx)?,
444                _ => tx.execute_batch(migration.sql)?,
445            }
446            tx.execute(
447                "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
448                (i64::from(migration.version.0), migration.description),
449            )?;
450            tx.commit()?;
451            trace_info!(
452                version = migration.version.0,
453                description = migration.description,
454                "schema migration applied"
455            );
456            applied_versions.push(migration.version);
457        }
458
459        let sqlite_version = conn.query_row("SELECT sqlite_version()", [], |row| row.get(0))?;
460        let vector_profile_count: i64 = conn.query_row(
461            "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
462            [],
463            |row| row.get(0),
464        )?;
465        Ok(BootstrapReport {
466            sqlite_version,
467            applied_versions,
468            vector_profile_enabled: vector_profile_count > 0,
469        })
470    }
471
472    fn ensure_vector_regeneration_apply_metadata(conn: &Connection) -> Result<(), SchemaError> {
473        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
474        let columns = stmt
475            .query_map([], |row| row.get::<_, String>(1))?
476            .collect::<Result<Vec<_>, _>>()?;
477        let has_applied_at = columns.iter().any(|column| column == "applied_at");
478        let has_snapshot_hash = columns.iter().any(|column| column == "snapshot_hash");
479
480        if !has_applied_at {
481            conn.execute(
482                "ALTER TABLE vector_embedding_contracts ADD COLUMN applied_at INTEGER NOT NULL DEFAULT 0",
483                [],
484            )?;
485        }
486        if !has_snapshot_hash {
487            conn.execute(
488                "ALTER TABLE vector_embedding_contracts ADD COLUMN snapshot_hash TEXT NOT NULL DEFAULT ''",
489                [],
490            )?;
491        }
492        conn.execute(
493            r"
494            UPDATE vector_embedding_contracts
495            SET
496                applied_at = CASE
497                    WHEN applied_at = 0 THEN updated_at
498                    ELSE applied_at
499                END,
500                snapshot_hash = CASE
501                    WHEN snapshot_hash = '' THEN 'legacy'
502                    ELSE snapshot_hash
503                END
504            ",
505            [],
506        )?;
507        Ok(())
508    }
509
510    fn ensure_vector_contract_format_version(conn: &Connection) -> Result<(), SchemaError> {
511        let mut stmt = conn.prepare("PRAGMA table_info(vector_embedding_contracts)")?;
512        let columns = stmt
513            .query_map([], |row| row.get::<_, String>(1))?
514            .collect::<Result<Vec<_>, _>>()?;
515        let has_contract_format_version = columns
516            .iter()
517            .any(|column| column == "contract_format_version");
518
519        if !has_contract_format_version {
520            conn.execute(
521                "ALTER TABLE vector_embedding_contracts ADD COLUMN contract_format_version INTEGER NOT NULL DEFAULT 1",
522                [],
523            )?;
524        }
525        conn.execute(
526            r"
527            UPDATE vector_embedding_contracts
528            SET contract_format_version = 1
529            WHERE contract_format_version = 0
530            ",
531            [],
532        )?;
533        Ok(())
534    }
535
536    fn ensure_provenance_metadata(conn: &Connection) -> Result<(), SchemaError> {
537        let mut stmt = conn.prepare("PRAGMA table_info(provenance_events)")?;
538        let columns = stmt
539            .query_map([], |row| row.get::<_, String>(1))?
540            .collect::<Result<Vec<_>, _>>()?;
541        let has_metadata_json = columns.iter().any(|column| column == "metadata_json");
542
543        if !has_metadata_json {
544            conn.execute(
545                "ALTER TABLE provenance_events ADD COLUMN metadata_json TEXT NOT NULL DEFAULT ''",
546                [],
547            )?;
548        }
549        Ok(())
550    }
551
552    fn ensure_operational_mutation_order(conn: &Connection) -> Result<(), SchemaError> {
553        let mut stmt = conn.prepare("PRAGMA table_info(operational_mutations)")?;
554        let columns = stmt
555            .query_map([], |row| row.get::<_, String>(1))?
556            .collect::<Result<Vec<_>, _>>()?;
557        let has_mutation_order = columns.iter().any(|column| column == "mutation_order");
558
559        if !has_mutation_order {
560            conn.execute(
561                "ALTER TABLE operational_mutations ADD COLUMN mutation_order INTEGER NOT NULL DEFAULT 0",
562                [],
563            )?;
564        }
565        conn.execute(
566            r"
567            UPDATE operational_mutations
568            SET mutation_order = rowid
569            WHERE mutation_order = 0
570            ",
571            [],
572        )?;
573        conn.execute(
574            r"
575            CREATE INDEX IF NOT EXISTS idx_operational_mutations_collection_key_order
576                ON operational_mutations(collection_name, record_key, mutation_order DESC)
577            ",
578            [],
579        )?;
580        Ok(())
581    }
582
583    fn ensure_node_access_metadata(conn: &Connection) -> Result<(), SchemaError> {
584        conn.execute_batch(
585            r"
586            CREATE TABLE IF NOT EXISTS node_access_metadata (
587                logical_id TEXT PRIMARY KEY,
588                last_accessed_at INTEGER NOT NULL,
589                updated_at INTEGER NOT NULL
590            );
591
592            CREATE INDEX IF NOT EXISTS idx_node_access_metadata_last_accessed
593                ON node_access_metadata(last_accessed_at DESC);
594            ",
595        )?;
596        Ok(())
597    }
598
599    fn ensure_operational_filter_contract(conn: &Connection) -> Result<(), SchemaError> {
600        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
601        let columns = stmt
602            .query_map([], |row| row.get::<_, String>(1))?
603            .collect::<Result<Vec<_>, _>>()?;
604        let has_filter_fields_json = columns.iter().any(|column| column == "filter_fields_json");
605
606        if !has_filter_fields_json {
607            conn.execute(
608                "ALTER TABLE operational_collections ADD COLUMN filter_fields_json TEXT NOT NULL DEFAULT '[]'",
609                [],
610            )?;
611        }
612
613        conn.execute_batch(
614            r"
615            CREATE TABLE IF NOT EXISTS operational_filter_values (
616                mutation_id TEXT NOT NULL,
617                collection_name TEXT NOT NULL,
618                field_name TEXT NOT NULL,
619                string_value TEXT,
620                integer_value INTEGER,
621                PRIMARY KEY(mutation_id, field_name),
622                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE,
623                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
624            );
625
626            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_text
627                ON operational_filter_values(collection_name, field_name, string_value, mutation_id);
628            CREATE INDEX IF NOT EXISTS idx_operational_filter_values_integer
629                ON operational_filter_values(collection_name, field_name, integer_value, mutation_id);
630            ",
631        )?;
632        Ok(())
633    }
634
635    fn ensure_operational_validation_contract(conn: &Connection) -> Result<(), SchemaError> {
636        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
637        let columns = stmt
638            .query_map([], |row| row.get::<_, String>(1))?
639            .collect::<Result<Vec<_>, _>>()?;
640        let has_validation_json = columns.iter().any(|column| column == "validation_json");
641
642        if !has_validation_json {
643            conn.execute(
644                "ALTER TABLE operational_collections ADD COLUMN validation_json TEXT NOT NULL DEFAULT ''",
645                [],
646            )?;
647        }
648
649        Ok(())
650    }
651
652    fn ensure_operational_secondary_indexes(conn: &Connection) -> Result<(), SchemaError> {
653        let mut stmt = conn.prepare("PRAGMA table_info(operational_collections)")?;
654        let columns = stmt
655            .query_map([], |row| row.get::<_, String>(1))?
656            .collect::<Result<Vec<_>, _>>()?;
657        let has_secondary_indexes_json = columns
658            .iter()
659            .any(|column| column == "secondary_indexes_json");
660
661        if !has_secondary_indexes_json {
662            conn.execute(
663                "ALTER TABLE operational_collections ADD COLUMN secondary_indexes_json TEXT NOT NULL DEFAULT '[]'",
664                [],
665            )?;
666        }
667
668        conn.execute_batch(
669            r"
670            CREATE TABLE IF NOT EXISTS operational_secondary_index_entries (
671                collection_name TEXT NOT NULL,
672                index_name TEXT NOT NULL,
673                subject_kind TEXT NOT NULL,
674                mutation_id TEXT NOT NULL DEFAULT '',
675                record_key TEXT NOT NULL DEFAULT '',
676                sort_timestamp INTEGER,
677                slot1_text TEXT,
678                slot1_integer INTEGER,
679                slot2_text TEXT,
680                slot2_integer INTEGER,
681                slot3_text TEXT,
682                slot3_integer INTEGER,
683                PRIMARY KEY(collection_name, index_name, subject_kind, mutation_id, record_key),
684                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
685                FOREIGN KEY(mutation_id) REFERENCES operational_mutations(id) ON DELETE CASCADE
686            );
687
688            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_text
689                ON operational_secondary_index_entries(
690                    collection_name, index_name, subject_kind, slot1_text, sort_timestamp DESC, mutation_id, record_key
691                );
692            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_slot1_integer
693                ON operational_secondary_index_entries(
694                    collection_name, index_name, subject_kind, slot1_integer, sort_timestamp DESC, mutation_id, record_key
695                );
696            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_text
697                ON operational_secondary_index_entries(
698                    collection_name, index_name, subject_kind, slot1_text, slot2_text, slot3_text, sort_timestamp DESC, record_key
699                );
700            CREATE INDEX IF NOT EXISTS idx_operational_secondary_entries_composite_integer
701                ON operational_secondary_index_entries(
702                    collection_name, index_name, subject_kind, slot1_integer, slot2_integer, slot3_integer, sort_timestamp DESC, record_key
703                );
704            ",
705        )?;
706
707        Ok(())
708    }
709
710    fn ensure_operational_retention_runs(conn: &Connection) -> Result<(), SchemaError> {
711        conn.execute_batch(
712            r"
713            CREATE TABLE IF NOT EXISTS operational_retention_runs (
714                id TEXT PRIMARY KEY,
715                collection_name TEXT NOT NULL,
716                executed_at INTEGER NOT NULL,
717                action_kind TEXT NOT NULL,
718                dry_run INTEGER NOT NULL DEFAULT 0,
719                deleted_mutations INTEGER NOT NULL,
720                rows_remaining INTEGER NOT NULL,
721                metadata_json TEXT NOT NULL DEFAULT '',
722                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
723            );
724
725            CREATE INDEX IF NOT EXISTS idx_operational_retention_runs_collection_time
726                ON operational_retention_runs(collection_name, executed_at DESC);
727            ",
728        )?;
729        Ok(())
730    }
731
732    #[must_use]
733    pub fn current_version(&self) -> SchemaVersion {
734        self.migrations()
735            .last()
736            .map_or(SchemaVersion(0), |migration| migration.version)
737    }
738
739    #[must_use]
740    pub fn migrations(&self) -> &'static [Migration] {
741        MIGRATIONS
742    }
743
744    /// Set the recommended `SQLite` connection pragmas for fathomdb.
745    ///
746    /// # Errors
747    ///
748    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
749    pub fn initialize_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
750        conn.execute_batch(
751            r"
752            PRAGMA foreign_keys = ON;
753            PRAGMA journal_mode = WAL;
754            PRAGMA synchronous = NORMAL;
755            PRAGMA busy_timeout = 5000;
756            PRAGMA temp_store = MEMORY;
757            PRAGMA mmap_size = 3000000000;
758            PRAGMA journal_size_limit = 536870912;
759            ",
760        )?;
761        Ok(())
762    }
763
764    /// Initialize a **read-only** connection with PRAGMAs that are safe for
765    /// readers.
766    ///
767    /// Skips `journal_mode` (requires write; the writer already set WAL),
768    /// `synchronous` (irrelevant for readers), and `journal_size_limit`
769    /// (requires write).
770    ///
771    /// # Errors
772    ///
773    /// Returns [`SchemaError`] if any PRAGMA fails to execute.
774    pub fn initialize_reader_connection(&self, conn: &Connection) -> Result<(), SchemaError> {
775        conn.execute_batch(
776            r"
777            PRAGMA foreign_keys = ON;
778            PRAGMA busy_timeout = 5000;
779            PRAGMA temp_store = MEMORY;
780            PRAGMA mmap_size = 3000000000;
781            ",
782        )?;
783        Ok(())
784    }
785
786    /// Ensure the sqlite-vec vector extension profile is registered and the
787    /// virtual vec table exists.
788    ///
789    /// When the `sqlite-vec` feature is enabled this creates the virtual table
790    /// and records the profile in `vector_profiles` (with `enabled = 1`).
791    /// When the feature is absent the call always returns
792    /// [`SchemaError::MissingCapability`].
793    ///
794    /// # Errors
795    ///
796    /// Returns [`SchemaError`] if the DDL fails or the feature is absent.
797    #[cfg(feature = "sqlite-vec")]
798    pub fn ensure_vector_profile(
799        &self,
800        conn: &Connection,
801        profile: &str,
802        table_name: &str,
803        dimension: usize,
804    ) -> Result<(), SchemaError> {
805        conn.execute_batch(&format!(
806            "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
807                chunk_id TEXT PRIMARY KEY,\
808                embedding float[{dimension}]\
809            )"
810        ))?;
811        conn.execute(
812            "INSERT OR REPLACE INTO vector_profiles \
813             (profile, table_name, dimension, enabled) VALUES (?1, ?2, ?3, 1)",
814            rusqlite::params![profile, table_name, dimension as i64],
815        )?;
816        Ok(())
817    }
818
819    /// # Errors
820    ///
821    /// Always returns [`SchemaError::MissingCapability`] when the `sqlite-vec`
822    /// feature is not compiled in.
823    #[cfg(not(feature = "sqlite-vec"))]
824    pub fn ensure_vector_profile(
825        &self,
826        _conn: &Connection,
827        _profile: &str,
828        _table_name: &str,
829        _dimension: usize,
830    ) -> Result<(), SchemaError> {
831        Err(SchemaError::MissingCapability("sqlite-vec"))
832    }
833
834    /// Create the internal migration-tracking table if it does not exist.
835    ///
836    /// # Errors
837    ///
838    /// Returns [`SchemaError`] if the DDL fails to execute.
839    fn ensure_metadata_tables(conn: &Connection) -> Result<(), SchemaError> {
840        conn.execute_batch(
841            r"
842            CREATE TABLE IF NOT EXISTS fathom_schema_migrations (
843                version INTEGER PRIMARY KEY,
844                description TEXT NOT NULL,
845                applied_at INTEGER NOT NULL DEFAULT (unixepoch())
846            );
847            ",
848        )?;
849        Ok(())
850    }
851}
852
853#[cfg(test)]
854#[allow(clippy::expect_used)]
855mod tests {
856    use rusqlite::Connection;
857
858    use super::SchemaManager;
859
860    #[test]
861    fn bootstrap_applies_initial_schema() {
862        let conn = Connection::open_in_memory().expect("in-memory sqlite");
863        let manager = SchemaManager::new();
864
865        let report = manager.bootstrap(&conn).expect("bootstrap report");
866
867        assert_eq!(
868            report.applied_versions.len(),
869            manager.current_version().0 as usize
870        );
871        assert!(report.sqlite_version.starts_with('3'));
872        let table_count: i64 = conn
873            .query_row(
874                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'nodes'",
875                [],
876                |row| row.get(0),
877            )
878            .expect("nodes table exists");
879        assert_eq!(table_count, 1);
880    }
881
882    // --- Item 2: vector profile tests ---
883
884    #[test]
885    fn vector_profile_not_enabled_without_feature() {
886        let conn = Connection::open_in_memory().expect("in-memory sqlite");
887        let manager = SchemaManager::new();
888        let report = manager.bootstrap(&conn).expect("bootstrap");
889        assert!(
890            !report.vector_profile_enabled,
891            "vector_profile_enabled must be false on a fresh bootstrap"
892        );
893    }
894
895    #[test]
896    fn vector_profile_skipped_when_dimension_absent() {
897        // ensure_vector_profile is never called → enabled stays false
898        let conn = Connection::open_in_memory().expect("in-memory sqlite");
899        let manager = SchemaManager::new();
900        manager.bootstrap(&conn).expect("bootstrap");
901
902        let count: i64 = conn
903            .query_row(
904                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
905                [],
906                |row| row.get(0),
907            )
908            .expect("count");
909        assert_eq!(
910            count, 0,
911            "no enabled profile without calling ensure_vector_profile"
912        );
913    }
914
915    #[test]
916    fn bootstrap_report_reflects_actual_vector_state() {
917        // After a fresh bootstrap with no vector profile, the report reflects reality.
918        let conn = Connection::open_in_memory().expect("in-memory sqlite");
919        let manager = SchemaManager::new();
920        let report = manager.bootstrap(&conn).expect("bootstrap");
921
922        let db_count: i64 = conn
923            .query_row(
924                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
925                [],
926                |row| row.get(0),
927            )
928            .expect("count");
929        assert_eq!(
930            report.vector_profile_enabled,
931            db_count > 0,
932            "BootstrapReport.vector_profile_enabled must match actual DB state"
933        );
934    }
935
936    #[test]
937    fn bootstrap_backfills_vector_contract_format_and_provenance_metadata_columns() {
938        let conn = Connection::open_in_memory().expect("in-memory sqlite");
939        conn.execute_batch(
940            r#"
941            CREATE TABLE provenance_events (
942                id         TEXT PRIMARY KEY,
943                event_type TEXT NOT NULL,
944                subject    TEXT NOT NULL,
945                source_ref TEXT,
946                created_at INTEGER NOT NULL DEFAULT (unixepoch())
947            );
948            CREATE TABLE vector_embedding_contracts (
949                profile TEXT PRIMARY KEY,
950                table_name TEXT NOT NULL,
951                model_identity TEXT NOT NULL,
952                model_version TEXT NOT NULL,
953                dimension INTEGER NOT NULL,
954                normalization_policy TEXT NOT NULL,
955                chunking_policy TEXT NOT NULL,
956                preprocessing_policy TEXT NOT NULL,
957                generator_command_json TEXT NOT NULL,
958                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
959                applied_at INTEGER NOT NULL DEFAULT 0,
960                snapshot_hash TEXT NOT NULL DEFAULT ''
961            );
962            INSERT INTO vector_embedding_contracts (
963                profile,
964                table_name,
965                model_identity,
966                model_version,
967                dimension,
968                normalization_policy,
969                chunking_policy,
970                preprocessing_policy,
971                generator_command_json,
972                updated_at,
973                applied_at,
974                snapshot_hash
975            ) VALUES (
976                'default',
977                'vec_nodes_active',
978                'legacy-model',
979                '0.9.0',
980                4,
981                'l2',
982                'per_chunk',
983                'trim',
984                '["/bin/echo"]',
985                100,
986                100,
987                'legacy'
988            );
989            "#,
990        )
991        .expect("seed legacy schema");
992        let manager = SchemaManager::new();
993
994        let report = manager.bootstrap(&conn).expect("bootstrap");
995
996        assert!(
997            report.applied_versions.iter().any(|version| version.0 >= 5),
998            "bootstrap should apply hardening migrations"
999        );
1000        let format_version: i64 = conn
1001            .query_row(
1002                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
1003                [],
1004                |row| row.get(0),
1005            )
1006            .expect("contract_format_version");
1007        assert_eq!(format_version, 1);
1008        let metadata_column_count: i64 = conn
1009            .query_row(
1010                "SELECT count(*) FROM pragma_table_info('provenance_events') WHERE name = 'metadata_json'",
1011                [],
1012                |row| row.get(0),
1013            )
1014            .expect("metadata_json column count");
1015        assert_eq!(metadata_column_count, 1);
1016    }
1017
1018    #[test]
1019    fn bootstrap_creates_operational_store_tables() {
1020        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1021        let manager = SchemaManager::new();
1022
1023        manager.bootstrap(&conn).expect("bootstrap");
1024
1025        for table in [
1026            "operational_collections",
1027            "operational_mutations",
1028            "operational_current",
1029        ] {
1030            let count: i64 = conn
1031                .query_row(
1032                    "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
1033                    [table],
1034                    |row| row.get(0),
1035                )
1036                .expect("table existence");
1037            assert_eq!(count, 1, "{table} should exist after bootstrap");
1038        }
1039        let mutation_order_columns: i64 = conn
1040            .query_row(
1041                "SELECT count(*) FROM pragma_table_info('operational_mutations') WHERE name = 'mutation_order'",
1042                [],
1043                |row| row.get(0),
1044            )
1045            .expect("mutation_order column exists");
1046        assert_eq!(mutation_order_columns, 1);
1047    }
1048
1049    #[test]
1050    fn bootstrap_is_idempotent_with_operational_store_tables() {
1051        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1052        let manager = SchemaManager::new();
1053
1054        manager.bootstrap(&conn).expect("first bootstrap");
1055        let report = manager.bootstrap(&conn).expect("second bootstrap");
1056
1057        assert!(
1058            report.applied_versions.is_empty(),
1059            "second bootstrap should apply no new migrations"
1060        );
1061        let count: i64 = conn
1062            .query_row(
1063                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_collections'",
1064                [],
1065                |row| row.get(0),
1066        )
1067        .expect("operational_collections table exists");
1068        assert_eq!(count, 1);
1069    }
1070
1071    #[test]
1072    fn bootstrap_is_idempotent_for_recovered_operational_tables_without_migration_history() {
1073        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1074        conn.execute_batch(
1075            r#"
1076            CREATE TABLE operational_collections (
1077                name TEXT PRIMARY KEY,
1078                kind TEXT NOT NULL,
1079                schema_json TEXT NOT NULL,
1080                retention_json TEXT NOT NULL,
1081                format_version INTEGER NOT NULL DEFAULT 1,
1082                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1083                disabled_at INTEGER
1084            );
1085
1086            CREATE TABLE operational_mutations (
1087                id TEXT PRIMARY KEY,
1088                collection_name TEXT NOT NULL,
1089                record_key TEXT NOT NULL,
1090                op_kind TEXT NOT NULL,
1091                payload_json TEXT NOT NULL,
1092                source_ref TEXT,
1093                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1094                mutation_order INTEGER NOT NULL DEFAULT 0,
1095                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1096            );
1097
1098            CREATE TABLE operational_current (
1099                collection_name TEXT NOT NULL,
1100                record_key TEXT NOT NULL,
1101                payload_json TEXT NOT NULL,
1102                updated_at INTEGER NOT NULL,
1103                last_mutation_id TEXT NOT NULL,
1104                PRIMARY KEY(collection_name, record_key),
1105                FOREIGN KEY(collection_name) REFERENCES operational_collections(name),
1106                FOREIGN KEY(last_mutation_id) REFERENCES operational_mutations(id)
1107            );
1108
1109            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1110            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1111            INSERT INTO operational_mutations (
1112                id, collection_name, record_key, op_kind, payload_json, created_at, mutation_order
1113            ) VALUES (
1114                'mut-1', 'audit_log', 'entry-1', 'put', '{"ok":true}', 10, 0
1115            );
1116            "#,
1117        )
1118        .expect("seed recovered operational tables");
1119
1120        let manager = SchemaManager::new();
1121        let report = manager
1122            .bootstrap(&conn)
1123            .expect("bootstrap recovered schema");
1124
1125        assert!(
1126            report.applied_versions.iter().any(|version| version.0 == 8),
1127            "bootstrap should record operational mutation ordering hardening"
1128        );
1129        let mutation_order: i64 = conn
1130            .query_row(
1131                "SELECT mutation_order FROM operational_mutations WHERE id = 'mut-1'",
1132                [],
1133                |row| row.get(0),
1134            )
1135            .expect("mutation_order");
1136        assert_ne!(
1137            mutation_order, 0,
1138            "bootstrap should backfill recovered operational rows"
1139        );
1140        let count: i64 = conn
1141            .query_row(
1142                "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_operational_mutations_collection_key_order'",
1143                [],
1144                |row| row.get(0),
1145            )
1146            .expect("ordering index exists");
1147        assert_eq!(count, 1);
1148    }
1149
1150    #[test]
1151    fn bootstrap_adds_operational_filter_contract_and_index_table() {
1152        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1153        conn.execute_batch(
1154            r#"
1155            CREATE TABLE operational_collections (
1156                name TEXT PRIMARY KEY,
1157                kind TEXT NOT NULL,
1158                schema_json TEXT NOT NULL,
1159                retention_json TEXT NOT NULL,
1160                format_version INTEGER NOT NULL DEFAULT 1,
1161                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1162                disabled_at INTEGER
1163            );
1164
1165            CREATE TABLE operational_mutations (
1166                id TEXT PRIMARY KEY,
1167                collection_name TEXT NOT NULL,
1168                record_key TEXT NOT NULL,
1169                op_kind TEXT NOT NULL,
1170                payload_json TEXT NOT NULL,
1171                source_ref TEXT,
1172                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1173                mutation_order INTEGER NOT NULL DEFAULT 1,
1174                FOREIGN KEY(collection_name) REFERENCES operational_collections(name)
1175            );
1176
1177            INSERT INTO operational_collections (name, kind, schema_json, retention_json)
1178            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}');
1179            "#,
1180        )
1181        .expect("seed recovered operational schema");
1182
1183        let manager = SchemaManager::new();
1184        let report = manager
1185            .bootstrap(&conn)
1186            .expect("bootstrap recovered schema");
1187
1188        assert!(
1189            report
1190                .applied_versions
1191                .iter()
1192                .any(|version| version.0 == 10),
1193            "bootstrap should record operational filtered read migration"
1194        );
1195        assert!(
1196            report
1197                .applied_versions
1198                .iter()
1199                .any(|version| version.0 == 11),
1200            "bootstrap should record operational validation migration"
1201        );
1202        let filter_fields_json: String = conn
1203            .query_row(
1204                "SELECT filter_fields_json FROM operational_collections WHERE name = 'audit_log'",
1205                [],
1206                |row| row.get(0),
1207            )
1208            .expect("filter_fields_json added");
1209        assert_eq!(filter_fields_json, "[]");
1210        let validation_json: String = conn
1211            .query_row(
1212                "SELECT validation_json FROM operational_collections WHERE name = 'audit_log'",
1213                [],
1214                |row| row.get(0),
1215            )
1216            .expect("validation_json added");
1217        assert_eq!(validation_json, "");
1218        let table_count: i64 = conn
1219            .query_row(
1220                "SELECT count(*) FROM sqlite_master WHERE type = 'table' AND name = 'operational_filter_values'",
1221                [],
1222                |row| row.get(0),
1223        )
1224        .expect("filter table exists");
1225        assert_eq!(table_count, 1);
1226    }
1227
1228    #[test]
1229    fn bootstrap_reapplies_migration_history_without_readding_filter_contract_columns() {
1230        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1231        let manager = SchemaManager::new();
1232        manager.bootstrap(&conn).expect("initial bootstrap");
1233
1234        conn.execute("DROP TABLE fathom_schema_migrations", [])
1235            .expect("drop migration history");
1236        SchemaManager::ensure_metadata_tables(&conn).expect("recreate migration metadata");
1237
1238        let report = manager
1239            .bootstrap(&conn)
1240            .expect("rebootstrap existing schema");
1241
1242        assert!(
1243            report
1244                .applied_versions
1245                .iter()
1246                .any(|version| version.0 == 10),
1247            "rebootstrap should re-record migration 10"
1248        );
1249        assert!(
1250            report
1251                .applied_versions
1252                .iter()
1253                .any(|version| version.0 == 11),
1254            "rebootstrap should re-record migration 11"
1255        );
1256        let filter_fields_json: String = conn
1257            .query_row(
1258                "SELECT filter_fields_json FROM operational_collections LIMIT 1",
1259                [],
1260                |row| row.get(0),
1261            )
1262            .unwrap_or_else(|_| "[]".to_string());
1263        assert_eq!(filter_fields_json, "[]");
1264        let validation_json: String = conn
1265            .query_row(
1266                "SELECT validation_json FROM operational_collections LIMIT 1",
1267                [],
1268                |row| row.get(0),
1269            )
1270            .unwrap_or_default();
1271        assert_eq!(validation_json, "");
1272    }
1273
1274    #[test]
1275    fn downgrade_detected_returns_version_mismatch() {
1276        use crate::SchemaError;
1277
1278        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1279        let manager = SchemaManager::new();
1280        manager.bootstrap(&conn).expect("initial bootstrap");
1281
1282        conn.execute(
1283            "INSERT INTO fathom_schema_migrations (version, description) VALUES (?1, ?2)",
1284            (999_i64, "future migration"),
1285        )
1286        .expect("insert future version");
1287
1288        let err = manager
1289            .bootstrap(&conn)
1290            .expect_err("should fail on downgrade");
1291        assert!(
1292            matches!(
1293                err,
1294                SchemaError::VersionMismatch {
1295                    database_version: 999,
1296                    ..
1297                }
1298            ),
1299            "expected VersionMismatch with database_version 999, got: {err}"
1300        );
1301    }
1302
1303    #[test]
1304    fn journal_size_limit_is_set() {
1305        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1306        let manager = SchemaManager::new();
1307        manager
1308            .initialize_connection(&conn)
1309            .expect("initialize_connection");
1310
1311        let limit: i64 = conn
1312            .query_row("PRAGMA journal_size_limit", [], |row| row.get(0))
1313            .expect("journal_size_limit pragma");
1314        assert_eq!(limit, 536_870_912);
1315    }
1316
1317    #[cfg(feature = "sqlite-vec")]
1318    #[test]
1319    fn vector_profile_created_when_feature_enabled() {
1320        // Register the sqlite-vec extension globally so the in-memory
1321        // connection can use the vec0 module.
1322        unsafe {
1323            rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
1324                sqlite_vec::sqlite3_vec_init as *const (),
1325            )));
1326        }
1327        let conn = Connection::open_in_memory().expect("in-memory sqlite");
1328        let manager = SchemaManager::new();
1329        manager.bootstrap(&conn).expect("bootstrap");
1330
1331        manager
1332            .ensure_vector_profile(&conn, "default", "vec_nodes_active", 128)
1333            .expect("ensure_vector_profile");
1334
1335        let count: i64 = conn
1336            .query_row(
1337                "SELECT count(*) FROM vector_profiles WHERE enabled = 1",
1338                [],
1339                |row| row.get(0),
1340            )
1341            .expect("count");
1342        assert_eq!(
1343            count, 1,
1344            "vector profile must be enabled after ensure_vector_profile"
1345        );
1346
1347        // Verify the virtual table exists by querying it
1348        let _: i64 = conn
1349            .query_row("SELECT count(*) FROM vec_nodes_active", [], |row| {
1350                row.get(0)
1351            })
1352            .expect("vec_nodes_active table must exist after ensure_vector_profile");
1353    }
1354}