1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5pub struct Migration {
10 pub id: &'static str,
11 pub up_sql: &'static str,
12 pub down_sql: Option<&'static str>,
13 pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17 pub service: &'static str,
18 pub sqlite: &'static [Migration],
19 pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23 CREATE TABLE IF NOT EXISTS _schema_versions (\
24 service TEXT NOT NULL,\
25 migration_id TEXT NOT NULL,\
26 applied_at INTEGER NOT NULL,\
27 PRIMARY KEY (service, migration_id)\
28 );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32 conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34 for migration in plan.sqlite {
35 if let Some(check) = migration.is_already_applied {
37 if check(conn) {
38 continue;
39 }
40 }
41
42 let already: bool = conn.query_row(
44 "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45 rusqlite::params![plan.service, migration.id],
46 |row| row.get(0),
47 )?;
48
49 if already {
50 continue;
51 }
52
53 conn.execute_batch(migration.up_sql)?;
55
56 conn.execute(
58 "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59 rusqlite::params![
60 plan.service,
61 migration.id,
62 chrono::Utc::now().timestamp_micros(),
63 ],
64 )?;
65 }
66
67 Ok(())
68}
69
70pub struct VersionedMigration {
80 pub version: u32,
82 pub name: &'static str,
84 pub up: &'static str,
87}
88
89const V1_UP: &str = "\
91 CREATE TABLE IF NOT EXISTS entities (\
92 id TEXT PRIMARY KEY,\
93 namespace TEXT NOT NULL,\
94 kind TEXT NOT NULL,\
95 name TEXT NOT NULL,\
96 description TEXT,\
97 properties TEXT,\
98 tags TEXT NOT NULL DEFAULT '[]',\
99 created_at INTEGER NOT NULL,\
100 updated_at INTEGER NOT NULL,\
101 deleted_at INTEGER\
102 );\
103 CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104 CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105 CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106 CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107 CREATE TABLE IF NOT EXISTS graph_edges (\
108 namespace TEXT NOT NULL,\
109 id TEXT NOT NULL,\
110 source_id TEXT NOT NULL,\
111 target_id TEXT NOT NULL,\
112 relation TEXT NOT NULL,\
113 weight REAL NOT NULL DEFAULT 1.0,\
114 created_at INTEGER NOT NULL,\
115 metadata TEXT,\
116 PRIMARY KEY (namespace, id)\
117 );\
118 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123 CREATE TABLE IF NOT EXISTS notes (\
124 id TEXT PRIMARY KEY,\
125 namespace TEXT NOT NULL,\
126 kind TEXT NOT NULL,\
127 content TEXT NOT NULL DEFAULT '',\
128 salience REAL NOT NULL DEFAULT 0.5,\
129 decay_factor REAL NOT NULL DEFAULT 0.0,\
130 expires_at INTEGER,\
131 properties TEXT,\
132 created_at INTEGER NOT NULL,\
133 updated_at INTEGER NOT NULL,\
134 deleted_at INTEGER\
135 );\
136 CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137 CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138 CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139 CREATE TABLE IF NOT EXISTS events (\
140 id TEXT PRIMARY KEY,\
141 namespace TEXT NOT NULL,\
142 verb TEXT NOT NULL,\
143 substrate TEXT NOT NULL,\
144 actor TEXT NOT NULL,\
145 outcome TEXT NOT NULL,\
146 data TEXT,\
147 duration_us INTEGER NOT NULL DEFAULT 0,\
148 target_id TEXT,\
149 created_at INTEGER NOT NULL\
150 );\
151 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157pub const MIGRATIONS: &[VersionedMigration] = &[
170 VersionedMigration {
171 version: 1,
172 name: "initial_schema",
173 up: V1_UP,
174 },
175 VersionedMigration {
176 version: 2,
177 name: "add_name_to_notes",
178 up: "ALTER TABLE notes ADD COLUMN name TEXT;",
179 },
180 VersionedMigration {
181 version: 3,
182 name: "add_events_namespace_created_index",
183 up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
184 },
185];
186
187const MIGRATION_TRACKING_TABLE: &str = "\
188 CREATE TABLE IF NOT EXISTS _schema_migrations (\
189 version INTEGER PRIMARY KEY,\
190 name TEXT NOT NULL,\
191 applied_at INTEGER NOT NULL\
192 );\
193";
194
195pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
216 for (i, m) in MIGRATIONS.iter().enumerate() {
217 let expected = (i + 1) as u32;
218 if m.version != expected {
219 return Err(SqliteError::InvalidData(format!(
220 "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
221 got version {}",
222 m.version
223 )));
224 }
225 }
226
227 conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
228
229 let current_version: u32 = conn
231 .query_row(
232 "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
233 [],
234 |row| row.get(0),
235 )
236 .unwrap_or(0);
237
238 let mut applied_version = current_version;
239
240 for migration in MIGRATIONS {
241 if migration.version <= current_version {
242 continue;
243 }
244
245 if migration.version == 2 {
250 let col_exists: bool = conn
251 .query_row(
252 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
253 [],
254 |row| row.get(0),
255 )
256 .unwrap_or(false);
257 if col_exists {
258 let now = chrono::Utc::now().timestamp_micros();
260 conn.execute(
261 "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
262 VALUES (?1, ?2, ?3)",
263 rusqlite::params![migration.version, migration.name, now],
264 )
265 .map_err(|e| SqliteError::Migration {
266 version: migration.version,
267 error: e.to_string(),
268 })?;
269 applied_version = migration.version;
270 continue;
271 }
272 }
273
274 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
275 version: migration.version,
276 error: e.to_string(),
277 })?;
278
279 tx.execute_batch(migration.up)
280 .map_err(|e| SqliteError::Migration {
281 version: migration.version,
282 error: e.to_string(),
283 })?;
284
285 let now = chrono::Utc::now().timestamp_micros();
286 tx.execute(
287 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
288 rusqlite::params![migration.version, migration.name, now],
289 )
290 .map_err(|e| SqliteError::Migration {
291 version: migration.version,
292 error: e.to_string(),
293 })?;
294
295 tx.commit().map_err(|e| SqliteError::Migration {
296 version: migration.version,
297 error: e.to_string(),
298 })?;
299
300 applied_version = migration.version;
301 }
302
303 Ok(applied_version)
304}
305
306#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn open_memory() -> Connection {
315 Connection::open_in_memory().expect("in-memory connection")
316 }
317
318 #[test]
319 fn fresh_db_migrates_to_latest() {
320 let mut conn = open_memory();
321 let version = run_migrations(&mut conn).expect("migrations should succeed");
322 assert_eq!(version, 3);
323
324 let count: i64 = conn
326 .query_row(
327 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3)",
328 [],
329 |row| row.get(0),
330 )
331 .unwrap();
332 assert_eq!(count, 3);
333
334 let tbl_count: i64 = conn
336 .query_row(
337 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
338 [],
339 |row| row.get(0),
340 )
341 .unwrap();
342 assert_eq!(tbl_count, 1);
343
344 let col_count: i64 = conn
346 .query_row(
347 "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
348 [],
349 |row| row.get(0),
350 )
351 .unwrap();
352 assert_eq!(col_count, 1, "V2 must add name column to notes");
353 }
354
355 #[test]
356 fn run_migrations_twice_is_idempotent() {
357 let mut conn = open_memory();
358 let v1 = run_migrations(&mut conn).expect("first run");
359 let v2 = run_migrations(&mut conn).expect("second run");
360 assert_eq!(v1, 3);
361 assert_eq!(v2, 3);
362
363 let count: i64 = conn
365 .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
366 row.get(0)
367 })
368 .unwrap();
369 assert_eq!(count, 3);
370 }
371
372 #[test]
373 fn failed_migration_rolls_back() {
374 let bad_v4 = VersionedMigration {
375 version: 4,
376 name: "bad_migration",
377 up: "THIS IS NOT VALID SQL;",
378 };
379
380 let mut conn = open_memory();
381
382 run_migrations(&mut conn).expect("V1+V2+V3 should apply cleanly");
384
385 let result = apply_single_migration(&mut conn, &bad_v4);
387 assert!(result.is_err(), "bad migration should return error");
388
389 let v4_count: i64 = conn
391 .query_row(
392 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 4",
393 [],
394 |row| row.get(0),
395 )
396 .unwrap();
397 assert_eq!(v4_count, 0, "V4 must not be recorded after rollback");
398
399 let applied_count: i64 = conn
401 .query_row(
402 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3)",
403 [],
404 |row| row.get(0),
405 )
406 .unwrap();
407 assert_eq!(applied_count, 3, "V1, V2, and V3 must still be recorded");
408 }
409
410 #[test]
411 fn store_ddl_then_migrations_is_idempotent() {
412 use crate::stores::note::ensure_notes_schema;
413
414 let mut conn = open_memory();
415
416 ensure_notes_schema(&conn).expect("store DDL should create notes");
419
420 let has_name: bool = conn
422 .query_row(
423 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
424 [],
425 |row| row.get(0),
426 )
427 .unwrap();
428 assert!(has_name, "NOTES_DDL should include name column");
429
430 let version = run_migrations(&mut conn).expect("migrations after store DDL");
433 assert_eq!(version, 3);
434
435 let v2_count: i64 = conn
437 .query_row(
438 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
439 [],
440 |row| row.get(0),
441 )
442 .unwrap();
443 assert_eq!(
444 v2_count, 1,
445 "V2 must be recorded even when column pre-exists"
446 );
447 }
448
449 fn apply_single_migration(
452 conn: &mut Connection,
453 migration: &VersionedMigration,
454 ) -> Result<(), SqliteError> {
455 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
456 version: migration.version,
457 error: e.to_string(),
458 })?;
459
460 tx.execute_batch(migration.up)
461 .map_err(|e| SqliteError::Migration {
462 version: migration.version,
463 error: e.to_string(),
464 })?;
465
466 let now = chrono::Utc::now().timestamp_micros();
467 tx.execute(
468 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
469 rusqlite::params![migration.version, migration.name, now],
470 )
471 .map_err(|e| SqliteError::Migration {
472 version: migration.version,
473 error: e.to_string(),
474 })?;
475
476 tx.commit().map_err(|e| SqliteError::Migration {
477 version: migration.version,
478 error: e.to_string(),
479 })?;
480
481 Ok(())
482 }
483}