Skip to main content

khive_db/
migrations.rs

1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5// =============================================================================
6// Legacy per-service migration API (preserved for backward compatibility)
7// =============================================================================
8
9pub struct Migration {
10    pub id: &'static str,
11    pub up_sql: &'static str,
12    pub down_sql: Option<&'static str>,
13    pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17    pub service: &'static str,
18    pub sqlite: &'static [Migration],
19    pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23    CREATE TABLE IF NOT EXISTS _schema_versions (\
24        service TEXT NOT NULL,\
25        migration_id TEXT NOT NULL,\
26        applied_at INTEGER NOT NULL,\
27        PRIMARY KEY (service, migration_id)\
28    );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32    conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34    for migration in plan.sqlite {
35        // Check if custom predicate says it's already applied
36        if let Some(check) = migration.is_already_applied {
37            if check(conn) {
38                continue;
39            }
40        }
41
42        // Check if tracked as applied
43        let already: bool = conn.query_row(
44            "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45            rusqlite::params![plan.service, migration.id],
46            |row| row.get(0),
47        )?;
48
49        if already {
50            continue;
51        }
52
53        // Apply
54        conn.execute_batch(migration.up_sql)?;
55
56        // Record
57        conn.execute(
58            "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59            rusqlite::params![
60                plan.service,
61                migration.id,
62                chrono::Utc::now().timestamp_micros(),
63            ],
64        )?;
65    }
66
67    Ok(())
68}
69
70// =============================================================================
71// Versioned migration system (ADR-022)
72// =============================================================================
73
74/// A single forward-only schema migration.
75///
76/// Migrations are applied in order from the current DB version to the target
77/// version. Each migration runs in its own transaction; a failure rolls back
78/// that migration and leaves the DB at the prior version.
79pub struct VersionedMigration {
80    /// Monotonically increasing version number, starting at 1.
81    pub version: u32,
82    /// Short human-readable name for the migration (used in the audit table).
83    pub name: &'static str,
84    /// SQL to apply this migration. May contain multiple statements separated
85    /// by semicolons; `execute_batch` runs them all.
86    pub up: &'static str,
87}
88
89// V1: The complete initial schema for all four core tables.
90const V1_UP: &str = "\
91    CREATE TABLE IF NOT EXISTS entities (\
92        id TEXT PRIMARY KEY,\
93        namespace TEXT NOT NULL,\
94        kind TEXT NOT NULL,\
95        name TEXT NOT NULL,\
96        description TEXT,\
97        properties TEXT,\
98        tags TEXT NOT NULL DEFAULT '[]',\
99        created_at INTEGER NOT NULL,\
100        updated_at INTEGER NOT NULL,\
101        deleted_at INTEGER\
102    );\
103    CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104    CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105    CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106    CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107    CREATE TABLE IF NOT EXISTS graph_edges (\
108        namespace TEXT NOT NULL,\
109        id TEXT NOT NULL,\
110        source_id TEXT NOT NULL,\
111        target_id TEXT NOT NULL,\
112        relation TEXT NOT NULL,\
113        weight REAL NOT NULL DEFAULT 1.0,\
114        created_at INTEGER NOT NULL,\
115        metadata TEXT,\
116        PRIMARY KEY (namespace, id)\
117    );\
118    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123    CREATE TABLE IF NOT EXISTS notes (\
124        id TEXT PRIMARY KEY,\
125        namespace TEXT NOT NULL,\
126        kind TEXT NOT NULL,\
127        content TEXT NOT NULL DEFAULT '',\
128        salience REAL NOT NULL DEFAULT 0.5,\
129        decay_factor REAL NOT NULL DEFAULT 0.0,\
130        expires_at INTEGER,\
131        properties TEXT,\
132        created_at INTEGER NOT NULL,\
133        updated_at INTEGER NOT NULL,\
134        deleted_at INTEGER\
135    );\
136    CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137    CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138    CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139    CREATE TABLE IF NOT EXISTS events (\
140        id TEXT PRIMARY KEY,\
141        namespace TEXT NOT NULL,\
142        verb TEXT NOT NULL,\
143        substrate TEXT NOT NULL,\
144        actor TEXT NOT NULL,\
145        outcome TEXT NOT NULL,\
146        data TEXT,\
147        duration_us INTEGER NOT NULL DEFAULT 0,\
148        target_id TEXT,\
149        created_at INTEGER NOT NULL\
150    );\
151    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157/// All known migrations, ordered by ascending version.
158///
159/// To add a new migration: append a `VersionedMigration` entry with
160/// `version = <last_version + 1>`. The version sequence must be contiguous
161/// (1, 2, 3, ...); `run_migrations` returns an error on gaps.
162///
163/// V2 note: `NOTES_DDL` in `stores/note.rs` already includes `name TEXT` so that
164/// in-process schema creation (used by tests and `StorageBackend::notes()`) has the
165/// column from the start.  When `run_migrations` is called on a DB that was
166/// bootstrapped via `NOTES_DDL`, the V2 `ALTER TABLE` would fail with "duplicate
167/// column name".  The migration runner handles this by checking column existence
168/// before applying V2 — see `run_migrations`.
169pub const MIGRATIONS: &[VersionedMigration] = &[
170    VersionedMigration {
171        version: 1,
172        name: "initial_schema",
173        up: V1_UP,
174    },
175    VersionedMigration {
176        version: 2,
177        name: "add_name_to_notes",
178        up: "ALTER TABLE notes ADD COLUMN name TEXT;",
179    },
180];
181
182const MIGRATION_TRACKING_TABLE: &str = "\
183    CREATE TABLE IF NOT EXISTS _schema_migrations (\
184        version   INTEGER PRIMARY KEY,\
185        name      TEXT NOT NULL,\
186        applied_at INTEGER NOT NULL\
187    );\
188";
189
190/// Apply all unapplied migrations from `MIGRATIONS` in order.
191///
192/// Returns the highest version now applied, or `0` if the DB is empty and no
193/// migrations exist.
194///
195/// # Idempotency
196///
197/// Safe to call multiple times. Already-applied migrations are skipped.
198///
199/// # Atomicity
200///
201/// Each migration runs in its own transaction. A failure rolls back that
202/// migration and leaves the DB at the prior version.
203///
204/// # Errors
205///
206/// Returns `SqliteError::InvalidData` if the `MIGRATIONS` array is not
207/// contiguous (1, 2, 3, ...).
208///
209/// Returns `SqliteError::Migration { version, error }` if any migration fails.
210pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
211    for (i, m) in MIGRATIONS.iter().enumerate() {
212        let expected = (i + 1) as u32;
213        if m.version != expected {
214            return Err(SqliteError::InvalidData(format!(
215                "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
216                 got version {}",
217                m.version
218            )));
219        }
220    }
221
222    conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
223
224    // Determine the current version (highest applied).
225    let current_version: u32 = conn
226        .query_row(
227            "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
228            [],
229            |row| row.get(0),
230        )
231        .unwrap_or(0);
232
233    let mut applied_version = current_version;
234
235    for migration in MIGRATIONS {
236        if migration.version <= current_version {
237            continue;
238        }
239
240        // V2 adds `name` to notes.  StorageBackend::notes() bootstraps the schema
241        // via NOTES_DDL (which already includes `name`), so the column may already
242        // exist even though the migration has never been recorded.  Treat "duplicate
243        // column name" from SQLite as idempotent for ALTER TABLE migrations.
244        if migration.version == 2 {
245            let col_exists: bool = conn
246                .query_row(
247                    "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
248                    [],
249                    |row| row.get(0),
250                )
251                .unwrap_or(false);
252            if col_exists {
253                // Column already present — record the migration as applied and skip.
254                let now = chrono::Utc::now().timestamp_micros();
255                conn.execute(
256                    "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
257                     VALUES (?1, ?2, ?3)",
258                    rusqlite::params![migration.version, migration.name, now],
259                )
260                .map_err(|e| SqliteError::Migration {
261                    version: migration.version,
262                    error: e.to_string(),
263                })?;
264                applied_version = migration.version;
265                continue;
266            }
267        }
268
269        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
270            version: migration.version,
271            error: e.to_string(),
272        })?;
273
274        tx.execute_batch(migration.up)
275            .map_err(|e| SqliteError::Migration {
276                version: migration.version,
277                error: e.to_string(),
278            })?;
279
280        let now = chrono::Utc::now().timestamp_micros();
281        tx.execute(
282            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
283            rusqlite::params![migration.version, migration.name, now],
284        )
285        .map_err(|e| SqliteError::Migration {
286            version: migration.version,
287            error: e.to_string(),
288        })?;
289
290        tx.commit().map_err(|e| SqliteError::Migration {
291            version: migration.version,
292            error: e.to_string(),
293        })?;
294
295        applied_version = migration.version;
296    }
297
298    Ok(applied_version)
299}
300
301// =============================================================================
302// Tests
303// =============================================================================
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    fn open_memory() -> Connection {
310        Connection::open_in_memory().expect("in-memory connection")
311    }
312
313    #[test]
314    fn fresh_db_migrates_to_latest() {
315        let mut conn = open_memory();
316        let version = run_migrations(&mut conn).expect("migrations should succeed");
317        assert_eq!(version, 2);
318
319        // Verify the tracking table has rows for V1 and V2.
320        let count: i64 = conn
321            .query_row(
322                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
323                [],
324                |row| row.get(0),
325            )
326            .unwrap();
327        assert_eq!(count, 2);
328
329        // Verify the entities table was created.
330        let tbl_count: i64 = conn
331            .query_row(
332                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
333                [],
334                |row| row.get(0),
335            )
336            .unwrap();
337        assert_eq!(tbl_count, 1);
338
339        // Verify V2 added the name column to notes.
340        let col_count: i64 = conn
341            .query_row(
342                "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
343                [],
344                |row| row.get(0),
345            )
346            .unwrap();
347        assert_eq!(col_count, 1, "V2 must add name column to notes");
348    }
349
350    #[test]
351    fn run_migrations_twice_is_idempotent() {
352        let mut conn = open_memory();
353        let v1 = run_migrations(&mut conn).expect("first run");
354        let v2 = run_migrations(&mut conn).expect("second run");
355        assert_eq!(v1, 2);
356        assert_eq!(v2, 2);
357
358        // Should still have exactly two rows in the tracking table (V1 + V2).
359        let count: i64 = conn
360            .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
361                row.get(0)
362            })
363            .unwrap();
364        assert_eq!(count, 2);
365    }
366
367    #[test]
368    fn failed_migration_rolls_back() {
369        let bad_v3 = VersionedMigration {
370            version: 3,
371            name: "bad_migration",
372            up: "THIS IS NOT VALID SQL;",
373        };
374
375        let mut conn = open_memory();
376
377        // Apply all real migrations (V1 + V2) so the DB is at V2.
378        run_migrations(&mut conn).expect("V1+V2 should apply cleanly");
379
380        // Now manually drive the bad V3 migration to check rollback behaviour.
381        let result = apply_single_migration(&mut conn, &bad_v3);
382        assert!(result.is_err(), "bad migration should return error");
383
384        // DB should still be at V2 — no V3 row in tracking.
385        let v3_count: i64 = conn
386            .query_row(
387                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 3",
388                [],
389                |row| row.get(0),
390            )
391            .unwrap();
392        assert_eq!(v3_count, 0, "V3 must not be recorded after rollback");
393
394        // V1 and V2 should still be there.
395        let applied_count: i64 = conn
396            .query_row(
397                "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
398                [],
399                |row| row.get(0),
400            )
401            .unwrap();
402        assert_eq!(applied_count, 2, "V1 and V2 must still be recorded");
403    }
404
405    #[test]
406    fn store_ddl_then_migrations_is_idempotent() {
407        use crate::stores::note::ensure_notes_schema;
408
409        let mut conn = open_memory();
410
411        // Simulate the StorageBackend path: store DDL creates notes table
412        // WITH the name column (NOTES_DDL includes it for test convenience).
413        ensure_notes_schema(&conn).expect("store DDL should create notes");
414
415        // Verify name column exists from DDL.
416        let has_name: bool = conn
417            .query_row(
418                "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
419                [],
420                |row| row.get(0),
421            )
422            .unwrap();
423        assert!(has_name, "NOTES_DDL should include name column");
424
425        // Now run versioned migrations — V2 should detect the existing column
426        // and skip the ALTER TABLE without error.
427        let version = run_migrations(&mut conn).expect("migrations after store DDL");
428        assert_eq!(version, 2);
429
430        // V2 should be recorded as applied (skipped but tracked).
431        let v2_count: i64 = conn
432            .query_row(
433                "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
434                [],
435                |row| row.get(0),
436            )
437            .unwrap();
438        assert_eq!(
439            v2_count, 1,
440            "V2 must be recorded even when column pre-exists"
441        );
442    }
443
444    /// Helper: apply a single migration in a transaction, recording it in the
445    /// tracking table. Extracted here for use in the rollback test only.
446    fn apply_single_migration(
447        conn: &mut Connection,
448        migration: &VersionedMigration,
449    ) -> Result<(), SqliteError> {
450        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
451            version: migration.version,
452            error: e.to_string(),
453        })?;
454
455        tx.execute_batch(migration.up)
456            .map_err(|e| SqliteError::Migration {
457                version: migration.version,
458                error: e.to_string(),
459            })?;
460
461        let now = chrono::Utc::now().timestamp_micros();
462        tx.execute(
463            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
464            rusqlite::params![migration.version, migration.name, now],
465        )
466        .map_err(|e| SqliteError::Migration {
467            version: migration.version,
468            error: e.to_string(),
469        })?;
470
471        tx.commit().map_err(|e| SqliteError::Migration {
472            version: migration.version,
473            error: e.to_string(),
474        })?;
475
476        Ok(())
477    }
478}