Skip to main content

khive_db/
migrations.rs

1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5// =============================================================================
6// Legacy per-service migration API (preserved for backward compatibility)
7// =============================================================================
8
9pub struct Migration {
10    pub id: &'static str,
11    pub up_sql: &'static str,
12    pub down_sql: Option<&'static str>,
13    pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17    pub service: &'static str,
18    pub sqlite: &'static [Migration],
19    pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23    CREATE TABLE IF NOT EXISTS _schema_versions (\
24        service TEXT NOT NULL,\
25        migration_id TEXT NOT NULL,\
26        applied_at INTEGER NOT NULL,\
27        PRIMARY KEY (service, migration_id)\
28    );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32    conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34    for migration in plan.sqlite {
35        // Check if custom predicate says it's already applied
36        if let Some(check) = migration.is_already_applied {
37            if check(conn) {
38                continue;
39            }
40        }
41
42        // Check if tracked as applied
43        let already: bool = conn.query_row(
44            "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45            rusqlite::params![plan.service, migration.id],
46            |row| row.get(0),
47        )?;
48
49        if already {
50            continue;
51        }
52
53        // Apply
54        conn.execute_batch(migration.up_sql)?;
55
56        // Record
57        conn.execute(
58            "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59            rusqlite::params![
60                plan.service,
61                migration.id,
62                chrono::Utc::now().timestamp_micros(),
63            ],
64        )?;
65    }
66
67    Ok(())
68}
69
70// =============================================================================
71// Versioned migration system (ADR-022)
72// =============================================================================
73
74/// A single forward-only schema migration.
75///
76/// Migrations are applied in order from the current DB version to the target
77/// version. Each migration runs in its own transaction; a failure rolls back
78/// that migration and leaves the DB at the prior version.
79pub struct VersionedMigration {
80    /// Monotonically increasing version number, starting at 1.
81    pub version: u32,
82    /// Short human-readable name for the migration (used in the audit table).
83    pub name: &'static str,
84    /// SQL to apply this migration. May contain multiple statements separated
85    /// by semicolons; `execute_batch` runs them all.
86    pub up: &'static str,
87}
88
89// V1: The complete initial schema for all four core tables.
90const V1_UP: &str = "\
91    CREATE TABLE IF NOT EXISTS entities (\
92        id TEXT PRIMARY KEY,\
93        namespace TEXT NOT NULL,\
94        kind TEXT NOT NULL,\
95        name TEXT NOT NULL,\
96        description TEXT,\
97        properties TEXT,\
98        tags TEXT NOT NULL DEFAULT '[]',\
99        created_at INTEGER NOT NULL,\
100        updated_at INTEGER NOT NULL,\
101        deleted_at INTEGER\
102    );\
103    CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104    CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105    CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106    CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107    CREATE TABLE IF NOT EXISTS graph_edges (\
108        namespace TEXT NOT NULL,\
109        id TEXT NOT NULL,\
110        source_id TEXT NOT NULL,\
111        target_id TEXT NOT NULL,\
112        relation TEXT NOT NULL,\
113        weight REAL NOT NULL DEFAULT 1.0,\
114        created_at INTEGER NOT NULL,\
115        metadata TEXT,\
116        PRIMARY KEY (namespace, id)\
117    );\
118    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123    CREATE TABLE IF NOT EXISTS notes (\
124        id TEXT PRIMARY KEY,\
125        namespace TEXT NOT NULL,\
126        kind TEXT NOT NULL,\
127        content TEXT NOT NULL DEFAULT '',\
128        salience REAL NOT NULL DEFAULT 0.5,\
129        decay_factor REAL NOT NULL DEFAULT 0.0,\
130        expires_at INTEGER,\
131        properties TEXT,\
132        created_at INTEGER NOT NULL,\
133        updated_at INTEGER NOT NULL,\
134        deleted_at INTEGER\
135    );\
136    CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137    CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138    CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139    CREATE TABLE IF NOT EXISTS events (\
140        id TEXT PRIMARY KEY,\
141        namespace TEXT NOT NULL,\
142        verb TEXT NOT NULL,\
143        substrate TEXT NOT NULL,\
144        actor TEXT NOT NULL,\
145        outcome TEXT NOT NULL,\
146        data TEXT,\
147        duration_us INTEGER NOT NULL DEFAULT 0,\
148        target_id TEXT,\
149        created_at INTEGER NOT NULL\
150    );\
151    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157/// All known migrations, ordered by ascending version.
158///
159/// To add a new migration: append a `VersionedMigration` entry with
160/// `version = <last_version + 1>`. The version sequence must be contiguous
161/// (1, 2, 3, ...); `run_migrations` returns an error on gaps.
162///
163/// V2 note: `NOTES_DDL` in `stores/note.rs` already includes `name TEXT` so that
164/// in-process schema creation (used by tests and `StorageBackend::notes()`) has the
165/// column from the start.  When `run_migrations` is called on a DB that was
166/// bootstrapped via `NOTES_DDL`, the V2 `ALTER TABLE` would fail with "duplicate
167/// column name".  The migration runner handles this by checking column existence
168/// before applying V2 — see `run_migrations`.
169pub const MIGRATIONS: &[VersionedMigration] = &[
170    VersionedMigration {
171        version: 1,
172        name: "initial_schema",
173        up: V1_UP,
174    },
175    VersionedMigration {
176        version: 2,
177        name: "add_name_to_notes",
178        up: "ALTER TABLE notes ADD COLUMN name TEXT;",
179    },
180    VersionedMigration {
181        version: 3,
182        name: "add_events_namespace_created_index",
183        up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
184    },
185];
186
187const MIGRATION_TRACKING_TABLE: &str = "\
188    CREATE TABLE IF NOT EXISTS _schema_migrations (\
189        version   INTEGER PRIMARY KEY,\
190        name      TEXT NOT NULL,\
191        applied_at INTEGER NOT NULL\
192    );\
193";
194
195/// Apply all unapplied migrations from `MIGRATIONS` in order.
196///
197/// Returns the highest version now applied, or `0` if the DB is empty and no
198/// migrations exist.
199///
200/// # Idempotency
201///
202/// Safe to call multiple times. Already-applied migrations are skipped.
203///
204/// # Atomicity
205///
206/// Each migration runs in its own transaction. A failure rolls back that
207/// migration and leaves the DB at the prior version.
208///
209/// # Errors
210///
211/// Returns `SqliteError::InvalidData` if the `MIGRATIONS` array is not
212/// contiguous (1, 2, 3, ...).
213///
214/// Returns `SqliteError::Migration { version, error }` if any migration fails.
215pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
216    for (i, m) in MIGRATIONS.iter().enumerate() {
217        let expected = (i + 1) as u32;
218        if m.version != expected {
219            return Err(SqliteError::InvalidData(format!(
220                "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
221                 got version {}",
222                m.version
223            )));
224        }
225    }
226
227    conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
228
229    // Determine the current version (highest applied).
230    let current_version: u32 = conn
231        .query_row(
232            "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
233            [],
234            |row| row.get(0),
235        )
236        .unwrap_or(0);
237
238    let mut applied_version = current_version;
239
240    for migration in MIGRATIONS {
241        if migration.version <= current_version {
242            continue;
243        }
244
245        // V2 adds `name` to notes.  StorageBackend::notes() bootstraps the schema
246        // via NOTES_DDL (which already includes `name`), so the column may already
247        // exist even though the migration has never been recorded.  Treat "duplicate
248        // column name" from SQLite as idempotent for ALTER TABLE migrations.
249        if migration.version == 2 {
250            let col_exists: bool = conn
251                .query_row(
252                    "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
253                    [],
254                    |row| row.get(0),
255                )
256                .unwrap_or(false);
257            if col_exists {
258                // Column already present — record the migration as applied and skip.
259                let now = chrono::Utc::now().timestamp_micros();
260                conn.execute(
261                    "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
262                     VALUES (?1, ?2, ?3)",
263                    rusqlite::params![migration.version, migration.name, now],
264                )
265                .map_err(|e| SqliteError::Migration {
266                    version: migration.version,
267                    error: e.to_string(),
268                })?;
269                applied_version = migration.version;
270                continue;
271            }
272        }
273
274        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
275            version: migration.version,
276            error: e.to_string(),
277        })?;
278
279        tx.execute_batch(migration.up)
280            .map_err(|e| SqliteError::Migration {
281                version: migration.version,
282                error: e.to_string(),
283            })?;
284
285        let now = chrono::Utc::now().timestamp_micros();
286        tx.execute(
287            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
288            rusqlite::params![migration.version, migration.name, now],
289        )
290        .map_err(|e| SqliteError::Migration {
291            version: migration.version,
292            error: e.to_string(),
293        })?;
294
295        tx.commit().map_err(|e| SqliteError::Migration {
296            version: migration.version,
297            error: e.to_string(),
298        })?;
299
300        applied_version = migration.version;
301    }
302
303    Ok(applied_version)
304}
305
306// =============================================================================
307// Tests
308// =============================================================================
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    fn open_memory() -> Connection {
315        Connection::open_in_memory().expect("in-memory connection")
316    }
317
318    #[test]
319    fn fresh_db_migrates_to_latest() {
320        let mut conn = open_memory();
321        let version = run_migrations(&mut conn).expect("migrations should succeed");
322        assert_eq!(version, 3);
323
324        // Verify the tracking table has rows for V1, V2, and V3.
325        let count: i64 = conn
326            .query_row(
327                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3)",
328                [],
329                |row| row.get(0),
330            )
331            .unwrap();
332        assert_eq!(count, 3);
333
334        // Verify the entities table was created.
335        let tbl_count: i64 = conn
336            .query_row(
337                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
338                [],
339                |row| row.get(0),
340            )
341            .unwrap();
342        assert_eq!(tbl_count, 1);
343
344        // Verify V2 added the name column to notes.
345        let col_count: i64 = conn
346            .query_row(
347                "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
348                [],
349                |row| row.get(0),
350            )
351            .unwrap();
352        assert_eq!(col_count, 1, "V2 must add name column to notes");
353    }
354
355    #[test]
356    fn run_migrations_twice_is_idempotent() {
357        let mut conn = open_memory();
358        let v1 = run_migrations(&mut conn).expect("first run");
359        let v2 = run_migrations(&mut conn).expect("second run");
360        assert_eq!(v1, 3);
361        assert_eq!(v2, 3);
362
363        // Should still have exactly three rows in the tracking table (V1 + V2 + V3).
364        let count: i64 = conn
365            .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
366                row.get(0)
367            })
368            .unwrap();
369        assert_eq!(count, 3);
370    }
371
372    #[test]
373    fn failed_migration_rolls_back() {
374        let bad_v4 = VersionedMigration {
375            version: 4,
376            name: "bad_migration",
377            up: "THIS IS NOT VALID SQL;",
378        };
379
380        let mut conn = open_memory();
381
382        // Apply all real migrations (V1 + V2 + V3) so the DB is at V3.
383        run_migrations(&mut conn).expect("V1+V2+V3 should apply cleanly");
384
385        // Now manually drive the bad V4 migration to check rollback behaviour.
386        let result = apply_single_migration(&mut conn, &bad_v4);
387        assert!(result.is_err(), "bad migration should return error");
388
389        // DB should still be at V3 — no V4 row in tracking.
390        let v4_count: i64 = conn
391            .query_row(
392                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 4",
393                [],
394                |row| row.get(0),
395            )
396            .unwrap();
397        assert_eq!(v4_count, 0, "V4 must not be recorded after rollback");
398
399        // V1, V2, and V3 should still be there.
400        let applied_count: i64 = conn
401            .query_row(
402                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3)",
403                [],
404                |row| row.get(0),
405            )
406            .unwrap();
407        assert_eq!(applied_count, 3, "V1, V2, and V3 must still be recorded");
408    }
409
410    #[test]
411    fn store_ddl_then_migrations_is_idempotent() {
412        use crate::stores::note::ensure_notes_schema;
413
414        let mut conn = open_memory();
415
416        // Simulate the StorageBackend path: store DDL creates notes table
417        // WITH the name column (NOTES_DDL includes it for test convenience).
418        ensure_notes_schema(&conn).expect("store DDL should create notes");
419
420        // Verify name column exists from DDL.
421        let has_name: bool = conn
422            .query_row(
423                "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
424                [],
425                |row| row.get(0),
426            )
427            .unwrap();
428        assert!(has_name, "NOTES_DDL should include name column");
429
430        // Now run versioned migrations — V2 should detect the existing column
431        // and skip the ALTER TABLE without error. V3 adds the composite index.
432        let version = run_migrations(&mut conn).expect("migrations after store DDL");
433        assert_eq!(version, 3);
434
435        // V2 should be recorded as applied (skipped but tracked).
436        let v2_count: i64 = conn
437            .query_row(
438                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
439                [],
440                |row| row.get(0),
441            )
442            .unwrap();
443        assert_eq!(
444            v2_count, 1,
445            "V2 must be recorded even when column pre-exists"
446        );
447    }
448
449    /// Helper: apply a single migration in a transaction, recording it in the
450    /// tracking table. Extracted here for use in the rollback test only.
451    fn apply_single_migration(
452        conn: &mut Connection,
453        migration: &VersionedMigration,
454    ) -> Result<(), SqliteError> {
455        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
456            version: migration.version,
457            error: e.to_string(),
458        })?;
459
460        tx.execute_batch(migration.up)
461            .map_err(|e| SqliteError::Migration {
462                version: migration.version,
463                error: e.to_string(),
464            })?;
465
466        let now = chrono::Utc::now().timestamp_micros();
467        tx.execute(
468            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
469            rusqlite::params![migration.version, migration.name, now],
470        )
471        .map_err(|e| SqliteError::Migration {
472            version: migration.version,
473            error: e.to_string(),
474        })?;
475
476        tx.commit().map_err(|e| SqliteError::Migration {
477            version: migration.version,
478            error: e.to_string(),
479        })?;
480
481        Ok(())
482    }
483}