1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5pub struct Migration {
10 pub id: &'static str,
11 pub up_sql: &'static str,
12 pub down_sql: Option<&'static str>,
13 pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17 pub service: &'static str,
18 pub sqlite: &'static [Migration],
19 pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23 CREATE TABLE IF NOT EXISTS _schema_versions (\
24 service TEXT NOT NULL,\
25 migration_id TEXT NOT NULL,\
26 applied_at INTEGER NOT NULL,\
27 PRIMARY KEY (service, migration_id)\
28 );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32 conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34 for migration in plan.sqlite {
35 if let Some(check) = migration.is_already_applied {
37 if check(conn) {
38 continue;
39 }
40 }
41
42 let already: bool = conn.query_row(
44 "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45 rusqlite::params![plan.service, migration.id],
46 |row| row.get(0),
47 )?;
48
49 if already {
50 continue;
51 }
52
53 conn.execute_batch(migration.up_sql)?;
55
56 conn.execute(
58 "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59 rusqlite::params![
60 plan.service,
61 migration.id,
62 chrono::Utc::now().timestamp_micros(),
63 ],
64 )?;
65 }
66
67 Ok(())
68}
69
70pub struct VersionedMigration {
80 pub version: u32,
82 pub name: &'static str,
84 pub up: &'static str,
87}
88
89const V1_UP: &str = "\
91 CREATE TABLE IF NOT EXISTS entities (\
92 id TEXT PRIMARY KEY,\
93 namespace TEXT NOT NULL,\
94 kind TEXT NOT NULL,\
95 name TEXT NOT NULL,\
96 description TEXT,\
97 properties TEXT,\
98 tags TEXT NOT NULL DEFAULT '[]',\
99 created_at INTEGER NOT NULL,\
100 updated_at INTEGER NOT NULL,\
101 deleted_at INTEGER\
102 );\
103 CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104 CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105 CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106 CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107 CREATE TABLE IF NOT EXISTS graph_edges (\
108 namespace TEXT NOT NULL,\
109 id TEXT NOT NULL,\
110 source_id TEXT NOT NULL,\
111 target_id TEXT NOT NULL,\
112 relation TEXT NOT NULL,\
113 weight REAL NOT NULL DEFAULT 1.0,\
114 created_at INTEGER NOT NULL,\
115 metadata TEXT,\
116 PRIMARY KEY (namespace, id)\
117 );\
118 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123 CREATE TABLE IF NOT EXISTS notes (\
124 id TEXT PRIMARY KEY,\
125 namespace TEXT NOT NULL,\
126 kind TEXT NOT NULL,\
127 content TEXT NOT NULL DEFAULT '',\
128 salience REAL NOT NULL DEFAULT 0.5,\
129 decay_factor REAL NOT NULL DEFAULT 0.0,\
130 expires_at INTEGER,\
131 properties TEXT,\
132 created_at INTEGER NOT NULL,\
133 updated_at INTEGER NOT NULL,\
134 deleted_at INTEGER\
135 );\
136 CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137 CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138 CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139 CREATE TABLE IF NOT EXISTS events (\
140 id TEXT PRIMARY KEY,\
141 namespace TEXT NOT NULL,\
142 verb TEXT NOT NULL,\
143 substrate TEXT NOT NULL,\
144 actor TEXT NOT NULL,\
145 outcome TEXT NOT NULL,\
146 data TEXT,\
147 duration_us INTEGER NOT NULL DEFAULT 0,\
148 target_id TEXT,\
149 created_at INTEGER NOT NULL\
150 );\
151 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157pub const MIGRATIONS: &[VersionedMigration] = &[
170 VersionedMigration {
171 version: 1,
172 name: "initial_schema",
173 up: V1_UP,
174 },
175 VersionedMigration {
176 version: 2,
177 name: "add_name_to_notes",
178 up: "ALTER TABLE notes ADD COLUMN name TEXT;",
179 },
180];
181
182const MIGRATION_TRACKING_TABLE: &str = "\
183 CREATE TABLE IF NOT EXISTS _schema_migrations (\
184 version INTEGER PRIMARY KEY,\
185 name TEXT NOT NULL,\
186 applied_at INTEGER NOT NULL\
187 );\
188";
189
190pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
211 for (i, m) in MIGRATIONS.iter().enumerate() {
212 let expected = (i + 1) as u32;
213 if m.version != expected {
214 return Err(SqliteError::InvalidData(format!(
215 "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
216 got version {}",
217 m.version
218 )));
219 }
220 }
221
222 conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
223
224 let current_version: u32 = conn
226 .query_row(
227 "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
228 [],
229 |row| row.get(0),
230 )
231 .unwrap_or(0);
232
233 let mut applied_version = current_version;
234
235 for migration in MIGRATIONS {
236 if migration.version <= current_version {
237 continue;
238 }
239
240 if migration.version == 2 {
245 let col_exists: bool = conn
246 .query_row(
247 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
248 [],
249 |row| row.get(0),
250 )
251 .unwrap_or(false);
252 if col_exists {
253 let now = chrono::Utc::now().timestamp_micros();
255 conn.execute(
256 "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
257 VALUES (?1, ?2, ?3)",
258 rusqlite::params![migration.version, migration.name, now],
259 )
260 .map_err(|e| SqliteError::Migration {
261 version: migration.version,
262 error: e.to_string(),
263 })?;
264 applied_version = migration.version;
265 continue;
266 }
267 }
268
269 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
270 version: migration.version,
271 error: e.to_string(),
272 })?;
273
274 tx.execute_batch(migration.up)
275 .map_err(|e| SqliteError::Migration {
276 version: migration.version,
277 error: e.to_string(),
278 })?;
279
280 let now = chrono::Utc::now().timestamp_micros();
281 tx.execute(
282 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
283 rusqlite::params![migration.version, migration.name, now],
284 )
285 .map_err(|e| SqliteError::Migration {
286 version: migration.version,
287 error: e.to_string(),
288 })?;
289
290 tx.commit().map_err(|e| SqliteError::Migration {
291 version: migration.version,
292 error: e.to_string(),
293 })?;
294
295 applied_version = migration.version;
296 }
297
298 Ok(applied_version)
299}
300
301#[cfg(test)]
306mod tests {
307 use super::*;
308
309 fn open_memory() -> Connection {
310 Connection::open_in_memory().expect("in-memory connection")
311 }
312
313 #[test]
314 fn fresh_db_migrates_to_latest() {
315 let mut conn = open_memory();
316 let version = run_migrations(&mut conn).expect("migrations should succeed");
317 assert_eq!(version, 2);
318
319 let count: i64 = conn
321 .query_row(
322 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
323 [],
324 |row| row.get(0),
325 )
326 .unwrap();
327 assert_eq!(count, 2);
328
329 let tbl_count: i64 = conn
331 .query_row(
332 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
333 [],
334 |row| row.get(0),
335 )
336 .unwrap();
337 assert_eq!(tbl_count, 1);
338
339 let col_count: i64 = conn
341 .query_row(
342 "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
343 [],
344 |row| row.get(0),
345 )
346 .unwrap();
347 assert_eq!(col_count, 1, "V2 must add name column to notes");
348 }
349
350 #[test]
351 fn run_migrations_twice_is_idempotent() {
352 let mut conn = open_memory();
353 let v1 = run_migrations(&mut conn).expect("first run");
354 let v2 = run_migrations(&mut conn).expect("second run");
355 assert_eq!(v1, 2);
356 assert_eq!(v2, 2);
357
358 let count: i64 = conn
360 .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
361 row.get(0)
362 })
363 .unwrap();
364 assert_eq!(count, 2);
365 }
366
367 #[test]
368 fn failed_migration_rolls_back() {
369 let bad_v3 = VersionedMigration {
370 version: 3,
371 name: "bad_migration",
372 up: "THIS IS NOT VALID SQL;",
373 };
374
375 let mut conn = open_memory();
376
377 run_migrations(&mut conn).expect("V1+V2 should apply cleanly");
379
380 let result = apply_single_migration(&mut conn, &bad_v3);
382 assert!(result.is_err(), "bad migration should return error");
383
384 let v3_count: i64 = conn
386 .query_row(
387 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 3",
388 [],
389 |row| row.get(0),
390 )
391 .unwrap();
392 assert_eq!(v3_count, 0, "V3 must not be recorded after rollback");
393
394 let applied_count: i64 = conn
396 .query_row(
397 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
398 [],
399 |row| row.get(0),
400 )
401 .unwrap();
402 assert_eq!(applied_count, 2, "V1 and V2 must still be recorded");
403 }
404
405 #[test]
406 fn store_ddl_then_migrations_is_idempotent() {
407 use crate::stores::note::ensure_notes_schema;
408
409 let mut conn = open_memory();
410
411 ensure_notes_schema(&conn).expect("store DDL should create notes");
414
415 let has_name: bool = conn
417 .query_row(
418 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
419 [],
420 |row| row.get(0),
421 )
422 .unwrap();
423 assert!(has_name, "NOTES_DDL should include name column");
424
425 let version = run_migrations(&mut conn).expect("migrations after store DDL");
428 assert_eq!(version, 2);
429
430 let v2_count: i64 = conn
432 .query_row(
433 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
434 [],
435 |row| row.get(0),
436 )
437 .unwrap();
438 assert_eq!(
439 v2_count, 1,
440 "V2 must be recorded even when column pre-exists"
441 );
442 }
443
444 fn apply_single_migration(
447 conn: &mut Connection,
448 migration: &VersionedMigration,
449 ) -> Result<(), SqliteError> {
450 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
451 version: migration.version,
452 error: e.to_string(),
453 })?;
454
455 tx.execute_batch(migration.up)
456 .map_err(|e| SqliteError::Migration {
457 version: migration.version,
458 error: e.to_string(),
459 })?;
460
461 let now = chrono::Utc::now().timestamp_micros();
462 tx.execute(
463 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
464 rusqlite::params![migration.version, migration.name, now],
465 )
466 .map_err(|e| SqliteError::Migration {
467 version: migration.version,
468 error: e.to_string(),
469 })?;
470
471 tx.commit().map_err(|e| SqliteError::Migration {
472 version: migration.version,
473 error: e.to_string(),
474 })?;
475
476 Ok(())
477 }
478}