Skip to main content

khive_db/
migrations.rs

1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5// =============================================================================
6// Legacy per-service migration API (preserved for backward compatibility)
7// =============================================================================
8
9pub struct Migration {
10    pub id: &'static str,
11    pub up_sql: &'static str,
12    pub down_sql: Option<&'static str>,
13    pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17    pub service: &'static str,
18    pub sqlite: &'static [Migration],
19    pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23    CREATE TABLE IF NOT EXISTS _schema_versions (\
24        service TEXT NOT NULL,\
25        migration_id TEXT NOT NULL,\
26        applied_at INTEGER NOT NULL,\
27        PRIMARY KEY (service, migration_id)\
28    );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32    conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34    for migration in plan.sqlite {
35        // Check if custom predicate says it's already applied
36        if let Some(check) = migration.is_already_applied {
37            if check(conn) {
38                continue;
39            }
40        }
41
42        // Check if tracked as applied
43        let already: bool = conn.query_row(
44            "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45            rusqlite::params![plan.service, migration.id],
46            |row| row.get(0),
47        )?;
48
49        if already {
50            continue;
51        }
52
53        // Apply
54        conn.execute_batch(migration.up_sql)?;
55
56        // Record
57        conn.execute(
58            "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59            rusqlite::params![
60                plan.service,
61                migration.id,
62                chrono::Utc::now().timestamp_micros(),
63            ],
64        )?;
65    }
66
67    Ok(())
68}
69
70// =============================================================================
71// Versioned migration system (ADR-022)
72// =============================================================================
73
74/// A single forward-only schema migration.
75///
76/// Migrations are applied in order from the current DB version to the target
77/// version. Each migration runs in its own transaction; a failure rolls back
78/// that migration and leaves the DB at the prior version.
79pub struct VersionedMigration {
80    /// Monotonically increasing version number, starting at 1.
81    pub version: u32,
82    /// Short human-readable name for the migration (used in the audit table).
83    pub name: &'static str,
84    /// SQL to apply this migration. May contain multiple statements separated
85    /// by semicolons; `execute_batch` runs them all.
86    pub up: &'static str,
87}
88
89// V1: The complete initial schema for all four core tables.
90const V1_UP: &str = "\
91    CREATE TABLE IF NOT EXISTS entities (\
92        id TEXT PRIMARY KEY,\
93        namespace TEXT NOT NULL,\
94        kind TEXT NOT NULL,\
95        name TEXT NOT NULL,\
96        description TEXT,\
97        properties TEXT,\
98        tags TEXT NOT NULL DEFAULT '[]',\
99        created_at INTEGER NOT NULL,\
100        updated_at INTEGER NOT NULL,\
101        deleted_at INTEGER\
102    );\
103    CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104    CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105    CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106    CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107    CREATE TABLE IF NOT EXISTS graph_edges (\
108        namespace TEXT NOT NULL,\
109        id TEXT NOT NULL,\
110        source_id TEXT NOT NULL,\
111        target_id TEXT NOT NULL,\
112        relation TEXT NOT NULL,\
113        weight REAL NOT NULL DEFAULT 1.0,\
114        created_at INTEGER NOT NULL,\
115        metadata TEXT,\
116        PRIMARY KEY (namespace, id)\
117    );\
118    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123    CREATE TABLE IF NOT EXISTS notes (\
124        id TEXT PRIMARY KEY,\
125        namespace TEXT NOT NULL,\
126        kind TEXT NOT NULL,\
127        content TEXT NOT NULL DEFAULT '',\
128        salience REAL NOT NULL DEFAULT 0.5,\
129        decay_factor REAL NOT NULL DEFAULT 0.0,\
130        expires_at INTEGER,\
131        properties TEXT,\
132        created_at INTEGER NOT NULL,\
133        updated_at INTEGER NOT NULL,\
134        deleted_at INTEGER\
135    );\
136    CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137    CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138    CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139    CREATE TABLE IF NOT EXISTS events (\
140        id TEXT PRIMARY KEY,\
141        namespace TEXT NOT NULL,\
142        verb TEXT NOT NULL,\
143        substrate TEXT NOT NULL,\
144        actor TEXT NOT NULL,\
145        outcome TEXT NOT NULL,\
146        data TEXT,\
147        duration_us INTEGER NOT NULL DEFAULT 0,\
148        target_id TEXT,\
149        created_at INTEGER NOT NULL\
150    );\
151    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157/// All known migrations, ordered by ascending version.
158///
159/// To add a new migration: append a `VersionedMigration` entry with
160/// `version = <last_version + 1>`. The version sequence must be contiguous
161/// (1, 2, 3, ...); `run_migrations` returns an error on gaps.
162///
163/// V2 note: `NOTES_DDL` in `stores/note.rs` already includes `name TEXT` so that
164/// in-process schema creation (used by tests and `StorageBackend::notes()`) has the
165/// column from the start.  When `run_migrations` is called on a DB that was
166/// bootstrapped via `NOTES_DDL`, the V2 `ALTER TABLE` would fail with "duplicate
167/// column name".  The migration runner handles this by checking column existence
168/// before applying V2 — see `run_migrations`.
169///
170/// V4 note: Deduplicates existing graph_edges rows that share the same
171/// (namespace, source_id, target_id, relation) triple, keeping the earliest
172/// rowid, then adds a unique index enforcing the constraint going forward.
173const V4_DEDUPE_GRAPH_EDGE_TRIPLES: &str = "\
174    DELETE FROM graph_edges \
175    WHERE rowid NOT IN (\
176        SELECT MIN(rowid) \
177        FROM graph_edges \
178        GROUP BY namespace, source_id, target_id, relation\
179    );\
180    CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple \
181    ON graph_edges(namespace, source_id, target_id, relation);\
182";
183
184pub const MIGRATIONS: &[VersionedMigration] = &[
185    VersionedMigration {
186        version: 1,
187        name: "initial_schema",
188        up: V1_UP,
189    },
190    VersionedMigration {
191        version: 2,
192        name: "add_name_to_notes",
193        up: "ALTER TABLE notes ADD COLUMN name TEXT;",
194    },
195    VersionedMigration {
196        version: 3,
197        name: "add_events_namespace_created_index",
198        up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
199    },
200    VersionedMigration {
201        version: 4,
202        name: "dedupe_graph_edge_triples",
203        up: V4_DEDUPE_GRAPH_EDGE_TRIPLES,
204    },
205];
206
207const MIGRATION_TRACKING_TABLE: &str = "\
208    CREATE TABLE IF NOT EXISTS _schema_migrations (\
209        version   INTEGER PRIMARY KEY,\
210        name      TEXT NOT NULL,\
211        applied_at INTEGER NOT NULL\
212    );\
213";
214
215/// Apply all unapplied migrations from `MIGRATIONS` in order.
216///
217/// Returns the highest version now applied, or `0` if the DB is empty and no
218/// migrations exist.
219///
220/// # Idempotency
221///
222/// Safe to call multiple times. Already-applied migrations are skipped.
223///
224/// # Atomicity
225///
226/// Each migration runs in its own transaction. A failure rolls back that
227/// migration and leaves the DB at the prior version.
228///
229/// # Errors
230///
231/// Returns `SqliteError::InvalidData` if the `MIGRATIONS` array is not
232/// contiguous (1, 2, 3, ...).
233///
234/// Returns `SqliteError::Migration { version, error }` if any migration fails.
235pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
236    for (i, m) in MIGRATIONS.iter().enumerate() {
237        let expected = (i + 1) as u32;
238        if m.version != expected {
239            return Err(SqliteError::InvalidData(format!(
240                "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
241                 got version {}",
242                m.version
243            )));
244        }
245    }
246
247    conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
248
249    // Determine the current version (highest applied).
250    let current_version: u32 = conn
251        .query_row(
252            "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
253            [],
254            |row| row.get(0),
255        )
256        .unwrap_or(0);
257
258    let mut applied_version = current_version;
259
260    for migration in MIGRATIONS {
261        if migration.version <= current_version {
262            continue;
263        }
264
265        // V2 adds `name` to notes.  StorageBackend::notes() bootstraps the schema
266        // via NOTES_DDL (which already includes `name`), so the column may already
267        // exist even though the migration has never been recorded.  Treat "duplicate
268        // column name" from SQLite as idempotent for ALTER TABLE migrations.
269        if migration.version == 2 {
270            let col_exists: bool = conn
271                .query_row(
272                    "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
273                    [],
274                    |row| row.get(0),
275                )
276                .unwrap_or(false);
277            if col_exists {
278                // Column already present — record the migration as applied and skip.
279                let now = chrono::Utc::now().timestamp_micros();
280                conn.execute(
281                    "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
282                     VALUES (?1, ?2, ?3)",
283                    rusqlite::params![migration.version, migration.name, now],
284                )
285                .map_err(|e| SqliteError::Migration {
286                    version: migration.version,
287                    error: e.to_string(),
288                })?;
289                applied_version = migration.version;
290                continue;
291            }
292        }
293
294        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
295            version: migration.version,
296            error: e.to_string(),
297        })?;
298
299        tx.execute_batch(migration.up)
300            .map_err(|e| SqliteError::Migration {
301                version: migration.version,
302                error: e.to_string(),
303            })?;
304
305        let now = chrono::Utc::now().timestamp_micros();
306        tx.execute(
307            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
308            rusqlite::params![migration.version, migration.name, now],
309        )
310        .map_err(|e| SqliteError::Migration {
311            version: migration.version,
312            error: e.to_string(),
313        })?;
314
315        tx.commit().map_err(|e| SqliteError::Migration {
316            version: migration.version,
317            error: e.to_string(),
318        })?;
319
320        applied_version = migration.version;
321    }
322
323    Ok(applied_version)
324}
325
326// =============================================================================
327// Tests
328// =============================================================================
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    fn open_memory() -> Connection {
335        Connection::open_in_memory().expect("in-memory connection")
336    }
337
338    #[test]
339    fn fresh_db_migrates_to_latest() {
340        let mut conn = open_memory();
341        let version = run_migrations(&mut conn).expect("migrations should succeed");
342        assert_eq!(version, 4);
343
344        // Verify the tracking table has rows for V1, V2, V3, and V4.
345        let count: i64 = conn
346            .query_row(
347                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4)",
348                [],
349                |row| row.get(0),
350            )
351            .unwrap();
352        assert_eq!(count, 4);
353
354        // Verify the entities table was created.
355        let tbl_count: i64 = conn
356            .query_row(
357                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
358                [],
359                |row| row.get(0),
360            )
361            .unwrap();
362        assert_eq!(tbl_count, 1);
363
364        // Verify V2 added the name column to notes.
365        let col_count: i64 = conn
366            .query_row(
367                "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
368                [],
369                |row| row.get(0),
370            )
371            .unwrap();
372        assert_eq!(col_count, 1, "V2 must add name column to notes");
373    }
374
375    #[test]
376    fn run_migrations_twice_is_idempotent() {
377        let mut conn = open_memory();
378        let v1 = run_migrations(&mut conn).expect("first run");
379        let v2 = run_migrations(&mut conn).expect("second run");
380        assert_eq!(v1, 4);
381        assert_eq!(v2, 4);
382
383        // Should still have exactly four rows in the tracking table (V1 + V2 + V3 + V4).
384        let count: i64 = conn
385            .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
386                row.get(0)
387            })
388            .unwrap();
389        assert_eq!(count, 4);
390    }
391
392    #[test]
393    fn failed_migration_rolls_back() {
394        let bad_v5 = VersionedMigration {
395            version: 5,
396            name: "bad_migration",
397            up: "THIS IS NOT VALID SQL;",
398        };
399
400        let mut conn = open_memory();
401
402        // Apply all real migrations (V1 + V2 + V3 + V4) so the DB is at V4.
403        run_migrations(&mut conn).expect("V1+V2+V3+V4 should apply cleanly");
404
405        // Now manually drive the bad V5 migration to check rollback behaviour.
406        let result = apply_single_migration(&mut conn, &bad_v5);
407        assert!(result.is_err(), "bad migration should return error");
408
409        // DB should still be at V4 — no V5 row in tracking.
410        let v5_count: i64 = conn
411            .query_row(
412                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 5",
413                [],
414                |row| row.get(0),
415            )
416            .unwrap();
417        assert_eq!(v5_count, 0, "V5 must not be recorded after rollback");
418
419        // V1, V2, V3, and V4 should still be there.
420        let applied_count: i64 = conn
421            .query_row(
422                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4)",
423                [],
424                |row| row.get(0),
425            )
426            .unwrap();
427        assert_eq!(
428            applied_count, 4,
429            "V1, V2, V3, and V4 must still be recorded"
430        );
431    }
432
433    #[test]
434    fn store_ddl_then_migrations_is_idempotent() {
435        use crate::stores::note::ensure_notes_schema;
436
437        let mut conn = open_memory();
438
439        // Simulate the StorageBackend path: store DDL creates notes table
440        // WITH the name column (NOTES_DDL includes it for test convenience).
441        ensure_notes_schema(&conn).expect("store DDL should create notes");
442
443        // Verify name column exists from DDL.
444        let has_name: bool = conn
445            .query_row(
446                "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
447                [],
448                |row| row.get(0),
449            )
450            .unwrap();
451        assert!(has_name, "NOTES_DDL should include name column");
452
453        // Now run versioned migrations — V2 should detect the existing column
454        // and skip the ALTER TABLE without error. V4 adds the unique triple index.
455        let version = run_migrations(&mut conn).expect("migrations after store DDL");
456        assert_eq!(version, 4);
457
458        // V2 should be recorded as applied (skipped but tracked).
459        let v2_count: i64 = conn
460            .query_row(
461                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
462                [],
463                |row| row.get(0),
464            )
465            .unwrap();
466        assert_eq!(
467            v2_count, 1,
468            "V2 must be recorded even when column pre-exists"
469        );
470    }
471
472    /// Helper: apply a single migration in a transaction, recording it in the
473    /// tracking table. Extracted here for use in the rollback test only.
474    fn apply_single_migration(
475        conn: &mut Connection,
476        migration: &VersionedMigration,
477    ) -> Result<(), SqliteError> {
478        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
479            version: migration.version,
480            error: e.to_string(),
481        })?;
482
483        tx.execute_batch(migration.up)
484            .map_err(|e| SqliteError::Migration {
485                version: migration.version,
486                error: e.to_string(),
487            })?;
488
489        let now = chrono::Utc::now().timestamp_micros();
490        tx.execute(
491            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
492            rusqlite::params![migration.version, migration.name, now],
493        )
494        .map_err(|e| SqliteError::Migration {
495            version: migration.version,
496            error: e.to_string(),
497        })?;
498
499        tx.commit().map_err(|e| SqliteError::Migration {
500            version: migration.version,
501            error: e.to_string(),
502        })?;
503
504        Ok(())
505    }
506}