use rusqlite::Connection;
use crate::error::SqliteError;
pub struct Migration {
pub id: &'static str,
pub up_sql: &'static str,
pub down_sql: Option<&'static str>,
pub is_already_applied: Option<fn(&Connection) -> bool>,
}
pub struct ServiceSchemaPlan {
pub service: &'static str,
pub sqlite: &'static [Migration],
pub postgres: &'static [Migration],
}
const SCHEMA_VERSION_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_versions (\
service TEXT NOT NULL,\
migration_id TEXT NOT NULL,\
applied_at INTEGER NOT NULL,\
PRIMARY KEY (service, migration_id)\
);\
";
pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
conn.execute_batch(SCHEMA_VERSION_TABLE)?;
for migration in plan.sqlite {
if let Some(check) = migration.is_already_applied {
if check(conn) {
continue;
}
}
let already: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
rusqlite::params![plan.service, migration.id],
|row| row.get(0),
)?;
if already {
continue;
}
conn.execute_batch(migration.up_sql)?;
conn.execute(
"INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![
plan.service,
migration.id,
chrono::Utc::now().timestamp_micros(),
],
)?;
}
Ok(())
}
pub struct VersionedMigration {
pub version: u32,
pub name: &'static str,
pub up: &'static str,
}
const V1_UP: &str = "\
CREATE TABLE IF NOT EXISTS entities (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
properties TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
CREATE TABLE IF NOT EXISTS graph_edges (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
metadata TEXT,\
PRIMARY KEY (namespace, id)\
);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
CREATE TABLE IF NOT EXISTS notes (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
content TEXT NOT NULL DEFAULT '',\
salience REAL NOT NULL DEFAULT 0.5,\
decay_factor REAL NOT NULL DEFAULT 0.0,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
CREATE TABLE IF NOT EXISTS events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
outcome TEXT NOT NULL,\
data TEXT,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
created_at INTEGER NOT NULL\
);\
CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
";
pub const MIGRATIONS: &[VersionedMigration] = &[
VersionedMigration {
version: 1,
name: "initial_schema",
up: V1_UP,
},
VersionedMigration {
version: 2,
name: "add_name_to_notes",
up: "ALTER TABLE notes ADD COLUMN name TEXT;",
},
];
const MIGRATION_TRACKING_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_migrations (\
version INTEGER PRIMARY KEY,\
name TEXT NOT NULL,\
applied_at INTEGER NOT NULL\
);\
";
pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
for (i, m) in MIGRATIONS.iter().enumerate() {
let expected = (i + 1) as u32;
if m.version != expected {
return Err(SqliteError::InvalidData(format!(
"MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
got version {}",
m.version
)));
}
}
conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
let current_version: u32 = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
[],
|row| row.get(0),
)
.unwrap_or(0);
let mut applied_version = current_version;
for migration in MIGRATIONS {
if migration.version <= current_version {
continue;
}
if migration.version == 2 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.execute_batch(migration.up)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
}
Ok(applied_version)
}
#[cfg(test)]
mod tests {
use super::*;
fn open_memory() -> Connection {
Connection::open_in_memory().expect("in-memory connection")
}
#[test]
fn fresh_db_migrates_to_latest() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 2);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 2);
let tbl_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(tbl_count, 1);
let col_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(col_count, 1, "V2 must add name column to notes");
}
#[test]
fn run_migrations_twice_is_idempotent() {
let mut conn = open_memory();
let v1 = run_migrations(&mut conn).expect("first run");
let v2 = run_migrations(&mut conn).expect("second run");
assert_eq!(v1, 2);
assert_eq!(v2, 2);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 2);
}
#[test]
fn failed_migration_rolls_back() {
let bad_v3 = VersionedMigration {
version: 3,
name: "bad_migration",
up: "THIS IS NOT VALID SQL;",
};
let mut conn = open_memory();
run_migrations(&mut conn).expect("V1+V2 should apply cleanly");
let result = apply_single_migration(&mut conn, &bad_v3);
assert!(result.is_err(), "bad migration should return error");
let v3_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 3",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(v3_count, 0, "V3 must not be recorded after rollback");
let applied_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(applied_count, 2, "V1 and V2 must still be recorded");
}
#[test]
fn store_ddl_then_migrations_is_idempotent() {
use crate::stores::note::ensure_notes_schema;
let mut conn = open_memory();
ensure_notes_schema(&conn).expect("store DDL should create notes");
let has_name: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert!(has_name, "NOTES_DDL should include name column");
let version = run_migrations(&mut conn).expect("migrations after store DDL");
assert_eq!(version, 2);
let v2_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v2_count, 1,
"V2 must be recorded even when column pre-exists"
);
}
fn apply_single_migration(
conn: &mut Connection,
migration: &VersionedMigration,
) -> Result<(), SqliteError> {
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.execute_batch(migration.up)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
Ok(())
}
}