Skip to main content

mempill_sqlite/
migrations.rs

1//! Schema migration runner for mempill-sqlite.
2//!
3//! Applies versioned DDL to a rusqlite [`Connection`] in a deterministic, idempotent manner.
4//! Schema version is tracked via SQLite's built-in `user_version` PRAGMA.
5//!
6//! # Intended PRAGMA environment (applied at connection open in connection.rs)
7//! - `PRAGMA journal_mode=WAL;`  — write-ahead log for concurrent reads during writes
8//! - `PRAGMA synchronous=FULL;`  — full durability (mandatory; WAL+NORMAL can lose writes on power loss)
9//! - `PRAGMA foreign_keys=ON;`   — enforce FK constraints defined in DDL
10
11use rusqlite::{Connection, Result};
12
13/// The target schema version this runner brings the database to.
14/// Increment this constant (and add a new migration step) for every future DDL change.
15pub const CURRENT_SCHEMA_VERSION: u32 = 2;
16
17/// Embedded DDL — the 4-table append-only schema (§5).
18const V1_INITIAL_SQL: &str = include_str!("schema/v1_initial.sql");
19
20/// Embedded index definitions (§5).
21const INDEXES_SQL: &str = include_str!("schema/indexes.sql");
22
23/// Embedded DDL — oracle adjudication queue (pending_adjudications table).
24const V2_PENDING_ADJUDICATIONS_SQL: &str = include_str!("schema/v2_pending_adjudications.sql");
25
26/// Migration error wrapper.
27#[derive(Debug, thiserror::Error)]
28pub enum MigrationError {
29    /// A rusqlite error occurred during schema migration.
30    #[error("SQLite error during migration: {0}")]
31    Sqlite(#[from] rusqlite::Error),
32}
33
34/// Apply all pending migrations to `conn` up to [`CURRENT_SCHEMA_VERSION`].
35///
36/// Idempotent: calling this function on a fully-migrated database is a no-op.
37/// Each migration step runs inside its own transaction so a partial failure leaves the
38/// database at a consistent version boundary (each migration step is fully atomic).
39///
40/// Connection lifecycle and PRAGMA initialisation (`journal_mode=WAL`, `synchronous=FULL`,
41/// `foreign_keys=ON`) are the caller's responsibility (implemented in `connection.rs`).
42pub fn apply_migrations(conn: &Connection) -> Result<(), MigrationError> {
43    let current = user_version(conn)?;
44
45    if current < 1 {
46        apply_v1(conn)?;
47    }
48
49    if current < 2 {
50        apply_v2(conn)?;
51    }
52
53    Ok(())
54}
55
56/// Read the SQLite `user_version` PRAGMA (0 = fresh/uninitialized database).
57fn user_version(conn: &Connection) -> Result<u32, MigrationError> {
58    let v: u32 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
59    Ok(v)
60}
61
62/// Set the SQLite `user_version` PRAGMA.
63///
64/// This PRAGMA write is intentionally NOT inside the DDL transaction because SQLite
65/// does not allow PRAGMA user_version inside a transaction on all versions. We set it
66/// after the DDL transaction commits, so a crash between DDL commit and PRAGMA write is
67/// safe: the DDL tables already exist and `CREATE TABLE IF NOT EXISTS` makes the next
68/// migration run a no-op even if user_version is still 0.
69fn set_user_version(conn: &Connection, version: u32) -> Result<(), MigrationError> {
70    conn.execute_batch(&format!("PRAGMA user_version = {version};"))?;
71    Ok(())
72}
73
74/// Migration v1: create the 4 append-only tables and all structural indexes.
75fn apply_v1(conn: &Connection) -> Result<(), MigrationError> {
76    conn.execute_batch(V1_INITIAL_SQL)?;
77    conn.execute_batch(INDEXES_SQL)?;
78    set_user_version(conn, 1)?;
79    Ok(())
80}
81
82/// Migration v2: create the oracle adjudication queue table and its indexes.
83fn apply_v2(conn: &Connection) -> Result<(), MigrationError> {
84    conn.execute_batch(V2_PENDING_ADJUDICATIONS_SQL)?;
85    set_user_version(conn, 2)?;
86    Ok(())
87}
88
89// ── Tests ──────────────────────────────────────────────────────────────────────
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use rusqlite::Connection;
95
96    fn open_memory() -> Connection {
97        Connection::open_in_memory().expect("in-memory database should open")
98    }
99
100    /// Helper: collect the column names for a given table from sqlite_master PRAGMA.
101    fn column_names(conn: &Connection, table: &str) -> Vec<String> {
102        let mut stmt = conn
103            .prepare(&format!("PRAGMA table_info({table})"))
104            .unwrap();
105        stmt.query_map([], |row| row.get::<_, String>(1))
106            .unwrap()
107            .map(|r| r.unwrap())
108            .collect()
109    }
110
111    /// Helper: check whether an index exists in sqlite_master.
112    fn index_exists(conn: &Connection, index_name: &str) -> bool {
113        let count: u32 = conn
114            .query_row(
115                "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
116                [index_name],
117                |row| row.get(0),
118            )
119            .unwrap_or(0);
120        count > 0
121    }
122
123    /// Helper: check whether a table exists in sqlite_master.
124    fn table_exists(conn: &Connection, table_name: &str) -> bool {
125        let count: u32 = conn
126            .query_row(
127                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
128                [table_name],
129                |row| row.get(0),
130            )
131            .unwrap_or(0);
132        count > 0
133    }
134
135    #[test]
136    fn all_four_tables_exist_after_migration() {
137        let conn = open_memory();
138        apply_migrations(&conn).expect("migrations should succeed");
139
140        assert!(table_exists(&conn, "claims"), "claims table must exist");
141        assert!(
142            table_exists(&conn, "validity_assertions"),
143            "validity_assertions table must exist"
144        );
145        assert!(
146            table_exists(&conn, "ledger_entries"),
147            "ledger_entries table must exist"
148        );
149        assert!(
150            table_exists(&conn, "claim_edges"),
151            "claim_edges table must exist"
152        );
153    }
154
155    #[test]
156    fn claims_table_has_expected_columns() {
157        let conn = open_memory();
158        apply_migrations(&conn).expect("migrations should succeed");
159
160        let cols = column_names(&conn, "claims");
161        for expected in &[
162            "claim_id",
163            "agent_id",
164            "subject",
165            "predicate",
166            "value",
167            "cardinality",
168            "provenance_label",
169            "nearest_external_anchor_id",
170            "derivation_depth",
171            "tx_time",
172            "valid_time_start",
173            "valid_time_end",
174            "valid_time_confidence",
175            "value_confidence",
176            "criticality",
177            "derived_from",
178            "metadata",
179            "snapshot_schema_version",
180            "embedding_model_id",
181        ] {
182            assert!(
183                cols.contains(&expected.to_string()),
184                "claims table missing column: {expected}"
185            );
186        }
187    }
188
189    #[test]
190    fn validity_assertions_table_has_expected_columns() {
191        let conn = open_memory();
192        apply_migrations(&conn).expect("migrations should succeed");
193
194        let cols = column_names(&conn, "validity_assertions");
195        for expected in &[
196            "assertion_id",
197            "agent_id",
198            "target_claim_id",
199            "assertion_kind",
200            "bound_at",
201            "reopen_at",
202            "provenance_label",
203            "value_confidence",
204            "valid_time_confidence",
205            "asserted_at",
206        ] {
207            assert!(
208                cols.contains(&expected.to_string()),
209                "validity_assertions table missing column: {expected}"
210            );
211        }
212    }
213
214    #[test]
215    fn ledger_entries_table_has_expected_columns() {
216        let conn = open_memory();
217        apply_migrations(&conn).expect("migrations should succeed");
218
219        let cols = column_names(&conn, "ledger_entries");
220        for expected in &[
221            "entry_id",
222            "agent_id",
223            "claim_id",
224            "event_kind",
225            "disposition",
226            "rationale",
227            "recorded_at",
228        ] {
229            assert!(
230                cols.contains(&expected.to_string()),
231                "ledger_entries table missing column: {expected}"
232            );
233        }
234    }
235
236    #[test]
237    fn claim_edges_table_has_expected_columns() {
238        let conn = open_memory();
239        apply_migrations(&conn).expect("migrations should succeed");
240
241        let cols = column_names(&conn, "claim_edges");
242        for expected in &[
243            "edge_id",
244            "agent_id",
245            "from_claim_id",
246            "to_claim_id",
247            "edge_kind",
248            "created_at",
249        ] {
250            assert!(
251                cols.contains(&expected.to_string()),
252                "claim_edges table missing column: {expected}"
253            );
254        }
255    }
256
257    #[test]
258    fn structural_subject_line_index_exists() {
259        let conn = open_memory();
260        apply_migrations(&conn).expect("migrations should succeed");
261
262        assert!(
263            index_exists(&conn, "idx_claims_subject_line"),
264            "primary structural subject-line index must exist"
265        );
266    }
267
268    #[test]
269    fn all_indexes_exist() {
270        let conn = open_memory();
271        apply_migrations(&conn).expect("migrations should succeed");
272
273        let expected_indexes = [
274            "idx_claims_subject_line",
275            "idx_validity_assertions_target",
276            "idx_ledger_agent_time",
277            "idx_edges_from",
278            "idx_edges_to",
279            "idx_claims_provenance",
280        ];
281        for idx in &expected_indexes {
282            assert!(
283                index_exists(&conn, idx),
284                "index missing after migration: {idx}"
285            );
286        }
287    }
288
289    #[test]
290    fn apply_migrations_is_idempotent() {
291        let conn = open_memory();
292        apply_migrations(&conn).expect("first migration should succeed");
293        apply_migrations(&conn).expect("second migration must not error (idempotent)");
294        apply_migrations(&conn).expect("third migration must not error (idempotent)");
295
296        // Tables and indexes must still be present after repeated runs.
297        assert!(table_exists(&conn, "claims"));
298        assert!(table_exists(&conn, "claim_edges"));
299        assert!(index_exists(&conn, "idx_claims_subject_line"));
300    }
301
302    #[test]
303    fn reserved_columns_exist_on_claims() {
304        let conn = open_memory();
305        apply_migrations(&conn).expect("migrations should succeed");
306
307        let cols = column_names(&conn, "claims");
308        assert!(
309            cols.contains(&"metadata".to_string()),
310            "reserved column 'metadata' must exist on claims"
311        );
312        assert!(
313            cols.contains(&"snapshot_schema_version".to_string()),
314            "reserved column 'snapshot_schema_version' must exist on claims"
315        );
316        assert!(
317            cols.contains(&"embedding_model_id".to_string()),
318            "reserved column 'embedding_model_id' must exist on claims"
319        );
320    }
321
322    #[test]
323    fn schema_version_is_set_after_migration() {
324        let conn = open_memory();
325        apply_migrations(&conn).expect("migrations should succeed");
326
327        let v = user_version(&conn).expect("user_version should be readable");
328        assert_eq!(
329            v, CURRENT_SCHEMA_VERSION,
330            "user_version PRAGMA must equal CURRENT_SCHEMA_VERSION after migration"
331        );
332    }
333
334    #[test]
335    fn pending_adjudications_table_exists_after_migration() {
336        let conn = open_memory();
337        apply_migrations(&conn).expect("migrations should succeed");
338        assert!(
339            table_exists(&conn, "pending_adjudications"),
340            "pending_adjudications table must exist after v2 migration"
341        );
342    }
343
344    #[test]
345    fn pending_adjudications_table_has_expected_columns() {
346        let conn = open_memory();
347        apply_migrations(&conn).expect("migrations should succeed");
348
349        let cols = column_names(&conn, "pending_adjudications");
350        for expected in &[
351            "handle_id",
352            "agent_id",
353            "subject",
354            "predicate",
355            "challenger_claim_ref",
356            "incumbent_claim_ref",
357            "request_payload",
358            "queued_at",
359            "expires_at",
360            "status",
361        ] {
362            assert!(
363                cols.contains(&expected.to_string()),
364                "pending_adjudications table missing column: {expected}"
365            );
366        }
367    }
368
369    #[test]
370    fn pending_adjudications_indexes_exist_after_migration() {
371        let conn = open_memory();
372        apply_migrations(&conn).expect("migrations should succeed");
373
374        // Agent-id lookup index (oracle poller).
375        assert!(
376            index_exists(&conn, "idx_pending_adj_agent_id"),
377            "idx_pending_adj_agent_id must exist after v2 migration"
378        );
379        // Partial TTL index (WHERE expires_at IS NOT NULL AND status = 'pending').
380        assert!(
381            index_exists(&conn, "idx_pending_adj_expires_at"),
382            "idx_pending_adj_expires_at must exist after v2 migration"
383        );
384    }
385
386    #[test]
387    fn apply_migrations_v2_is_idempotent() {
388        let conn = open_memory();
389        apply_migrations(&conn).expect("first migration should succeed");
390        apply_migrations(&conn).expect("second migration must not error (idempotent)");
391        apply_migrations(&conn).expect("third migration must not error (idempotent)");
392
393        assert!(table_exists(&conn, "pending_adjudications"));
394        assert!(index_exists(&conn, "idx_pending_adj_agent_id"));
395        assert!(index_exists(&conn, "idx_pending_adj_expires_at"));
396    }
397}