1use rusqlite::Connection;
2
3use crate::error::SqliteError;
4
5pub struct Migration {
10 pub id: &'static str,
11 pub up_sql: &'static str,
12 pub down_sql: Option<&'static str>,
13 pub is_already_applied: Option<fn(&Connection) -> bool>,
14}
15
16pub struct ServiceSchemaPlan {
17 pub service: &'static str,
18 pub sqlite: &'static [Migration],
19 pub postgres: &'static [Migration],
20}
21
22const SCHEMA_VERSION_TABLE: &str = "\
23 CREATE TABLE IF NOT EXISTS _schema_versions (\
24 service TEXT NOT NULL,\
25 migration_id TEXT NOT NULL,\
26 applied_at INTEGER NOT NULL,\
27 PRIMARY KEY (service, migration_id)\
28 );\
29";
30
31pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
32 conn.execute_batch(SCHEMA_VERSION_TABLE)?;
33
34 for migration in plan.sqlite {
35 if let Some(check) = migration.is_already_applied {
37 if check(conn) {
38 continue;
39 }
40 }
41
42 let already: bool = conn.query_row(
44 "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
45 rusqlite::params![plan.service, migration.id],
46 |row| row.get(0),
47 )?;
48
49 if already {
50 continue;
51 }
52
53 conn.execute_batch(migration.up_sql)?;
55
56 conn.execute(
58 "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
59 rusqlite::params![
60 plan.service,
61 migration.id,
62 chrono::Utc::now().timestamp_micros(),
63 ],
64 )?;
65 }
66
67 Ok(())
68}
69
70pub struct VersionedMigration {
80 pub version: u32,
82 pub name: &'static str,
84 pub up: &'static str,
87}
88
89const V1_UP: &str = "\
91 CREATE TABLE IF NOT EXISTS entities (\
92 id TEXT PRIMARY KEY,\
93 namespace TEXT NOT NULL,\
94 kind TEXT NOT NULL,\
95 name TEXT NOT NULL,\
96 description TEXT,\
97 properties TEXT,\
98 tags TEXT NOT NULL DEFAULT '[]',\
99 created_at INTEGER NOT NULL,\
100 updated_at INTEGER NOT NULL,\
101 deleted_at INTEGER\
102 );\
103 CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
104 CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
105 CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
106 CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
107 CREATE TABLE IF NOT EXISTS graph_edges (\
108 namespace TEXT NOT NULL,\
109 id TEXT NOT NULL,\
110 source_id TEXT NOT NULL,\
111 target_id TEXT NOT NULL,\
112 relation TEXT NOT NULL,\
113 weight REAL NOT NULL DEFAULT 1.0,\
114 created_at INTEGER NOT NULL,\
115 metadata TEXT,\
116 PRIMARY KEY (namespace, id)\
117 );\
118 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
119 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
120 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
121 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
122 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
123 CREATE TABLE IF NOT EXISTS notes (\
124 id TEXT PRIMARY KEY,\
125 namespace TEXT NOT NULL,\
126 kind TEXT NOT NULL,\
127 content TEXT NOT NULL DEFAULT '',\
128 salience REAL NOT NULL DEFAULT 0.5,\
129 decay_factor REAL NOT NULL DEFAULT 0.0,\
130 expires_at INTEGER,\
131 properties TEXT,\
132 created_at INTEGER NOT NULL,\
133 updated_at INTEGER NOT NULL,\
134 deleted_at INTEGER\
135 );\
136 CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
137 CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
138 CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
139 CREATE TABLE IF NOT EXISTS events (\
140 id TEXT PRIMARY KEY,\
141 namespace TEXT NOT NULL,\
142 verb TEXT NOT NULL,\
143 substrate TEXT NOT NULL,\
144 actor TEXT NOT NULL,\
145 outcome TEXT NOT NULL,\
146 data TEXT,\
147 duration_us INTEGER NOT NULL DEFAULT 0,\
148 target_id TEXT,\
149 created_at INTEGER NOT NULL\
150 );\
151 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
152 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
153 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
154 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
155";
156
157const V4_DEDUPE_GRAPH_EDGE_TRIPLES: &str = "\
174 DELETE FROM graph_edges \
175 WHERE rowid NOT IN (\
176 SELECT MIN(rowid) \
177 FROM graph_edges \
178 GROUP BY namespace, source_id, target_id, relation\
179 );\
180 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple \
181 ON graph_edges(namespace, source_id, target_id, relation);\
182";
183
184pub const MIGRATIONS: &[VersionedMigration] = &[
185 VersionedMigration {
186 version: 1,
187 name: "initial_schema",
188 up: V1_UP,
189 },
190 VersionedMigration {
191 version: 2,
192 name: "add_name_to_notes",
193 up: "ALTER TABLE notes ADD COLUMN name TEXT;",
194 },
195 VersionedMigration {
196 version: 3,
197 name: "add_events_namespace_created_index",
198 up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
199 },
200 VersionedMigration {
201 version: 4,
202 name: "dedupe_graph_edge_triples",
203 up: V4_DEDUPE_GRAPH_EDGE_TRIPLES,
204 },
205];
206
207const MIGRATION_TRACKING_TABLE: &str = "\
208 CREATE TABLE IF NOT EXISTS _schema_migrations (\
209 version INTEGER PRIMARY KEY,\
210 name TEXT NOT NULL,\
211 applied_at INTEGER NOT NULL\
212 );\
213";
214
215pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
236 for (i, m) in MIGRATIONS.iter().enumerate() {
237 let expected = (i + 1) as u32;
238 if m.version != expected {
239 return Err(SqliteError::InvalidData(format!(
240 "MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
241 got version {}",
242 m.version
243 )));
244 }
245 }
246
247 conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
248
249 let current_version: u32 = conn
251 .query_row(
252 "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
253 [],
254 |row| row.get(0),
255 )
256 .unwrap_or(0);
257
258 let mut applied_version = current_version;
259
260 for migration in MIGRATIONS {
261 if migration.version <= current_version {
262 continue;
263 }
264
265 if migration.version == 2 {
270 let col_exists: bool = conn
271 .query_row(
272 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
273 [],
274 |row| row.get(0),
275 )
276 .unwrap_or(false);
277 if col_exists {
278 let now = chrono::Utc::now().timestamp_micros();
280 conn.execute(
281 "INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
282 VALUES (?1, ?2, ?3)",
283 rusqlite::params![migration.version, migration.name, now],
284 )
285 .map_err(|e| SqliteError::Migration {
286 version: migration.version,
287 error: e.to_string(),
288 })?;
289 applied_version = migration.version;
290 continue;
291 }
292 }
293
294 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
295 version: migration.version,
296 error: e.to_string(),
297 })?;
298
299 tx.execute_batch(migration.up)
300 .map_err(|e| SqliteError::Migration {
301 version: migration.version,
302 error: e.to_string(),
303 })?;
304
305 let now = chrono::Utc::now().timestamp_micros();
306 tx.execute(
307 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
308 rusqlite::params![migration.version, migration.name, now],
309 )
310 .map_err(|e| SqliteError::Migration {
311 version: migration.version,
312 error: e.to_string(),
313 })?;
314
315 tx.commit().map_err(|e| SqliteError::Migration {
316 version: migration.version,
317 error: e.to_string(),
318 })?;
319
320 applied_version = migration.version;
321 }
322
323 Ok(applied_version)
324}
325
326#[cfg(test)]
331mod tests {
332 use super::*;
333
334 fn open_memory() -> Connection {
335 Connection::open_in_memory().expect("in-memory connection")
336 }
337
338 #[test]
339 fn fresh_db_migrates_to_latest() {
340 let mut conn = open_memory();
341 let version = run_migrations(&mut conn).expect("migrations should succeed");
342 assert_eq!(version, 4);
343
344 let count: i64 = conn
346 .query_row(
347 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4)",
348 [],
349 |row| row.get(0),
350 )
351 .unwrap();
352 assert_eq!(count, 4);
353
354 let tbl_count: i64 = conn
356 .query_row(
357 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
358 [],
359 |row| row.get(0),
360 )
361 .unwrap();
362 assert_eq!(tbl_count, 1);
363
364 let col_count: i64 = conn
366 .query_row(
367 "SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
368 [],
369 |row| row.get(0),
370 )
371 .unwrap();
372 assert_eq!(col_count, 1, "V2 must add name column to notes");
373 }
374
375 #[test]
376 fn run_migrations_twice_is_idempotent() {
377 let mut conn = open_memory();
378 let v1 = run_migrations(&mut conn).expect("first run");
379 let v2 = run_migrations(&mut conn).expect("second run");
380 assert_eq!(v1, 4);
381 assert_eq!(v2, 4);
382
383 let count: i64 = conn
385 .query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
386 row.get(0)
387 })
388 .unwrap();
389 assert_eq!(count, 4);
390 }
391
392 #[test]
393 fn failed_migration_rolls_back() {
394 let bad_v5 = VersionedMigration {
395 version: 5,
396 name: "bad_migration",
397 up: "THIS IS NOT VALID SQL;",
398 };
399
400 let mut conn = open_memory();
401
402 run_migrations(&mut conn).expect("V1+V2+V3+V4 should apply cleanly");
404
405 let result = apply_single_migration(&mut conn, &bad_v5);
407 assert!(result.is_err(), "bad migration should return error");
408
409 let v5_count: i64 = conn
411 .query_row(
412 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 5",
413 [],
414 |row| row.get(0),
415 )
416 .unwrap();
417 assert_eq!(v5_count, 0, "V5 must not be recorded after rollback");
418
419 let applied_count: i64 = conn
421 .query_row(
422 "SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4)",
423 [],
424 |row| row.get(0),
425 )
426 .unwrap();
427 assert_eq!(
428 applied_count, 4,
429 "V1, V2, V3, and V4 must still be recorded"
430 );
431 }
432
433 #[test]
434 fn store_ddl_then_migrations_is_idempotent() {
435 use crate::stores::note::ensure_notes_schema;
436
437 let mut conn = open_memory();
438
439 ensure_notes_schema(&conn).expect("store DDL should create notes");
442
443 let has_name: bool = conn
445 .query_row(
446 "SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
447 [],
448 |row| row.get(0),
449 )
450 .unwrap();
451 assert!(has_name, "NOTES_DDL should include name column");
452
453 let version = run_migrations(&mut conn).expect("migrations after store DDL");
456 assert_eq!(version, 4);
457
458 let v2_count: i64 = conn
460 .query_row(
461 "SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
462 [],
463 |row| row.get(0),
464 )
465 .unwrap();
466 assert_eq!(
467 v2_count, 1,
468 "V2 must be recorded even when column pre-exists"
469 );
470 }
471
472 fn apply_single_migration(
475 conn: &mut Connection,
476 migration: &VersionedMigration,
477 ) -> Result<(), SqliteError> {
478 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
479 version: migration.version,
480 error: e.to_string(),
481 })?;
482
483 tx.execute_batch(migration.up)
484 .map_err(|e| SqliteError::Migration {
485 version: migration.version,
486 error: e.to_string(),
487 })?;
488
489 let now = chrono::Utc::now().timestamp_micros();
490 tx.execute(
491 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
492 rusqlite::params![migration.version, migration.name, now],
493 )
494 .map_err(|e| SqliteError::Migration {
495 version: migration.version,
496 error: e.to_string(),
497 })?;
498
499 tx.commit().map_err(|e| SqliteError::Migration {
500 version: migration.version,
501 error: e.to_string(),
502 })?;
503
504 Ok(())
505 }
506}