pub mod m001_memory_commit_log;
pub mod m002_durable_jobs;
pub mod m003_hnsw_manifests;
pub mod m004_raft_log;
use rusqlite::{Connection, Transaction};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum MigrationError {
#[error("SQLite error during migration {migration_id} ({migration_name}): {source}")]
Sqlite {
migration_id: u32,
migration_name: &'static str,
source: rusqlite::Error,
},
#[error("migration {id} ({name}) failed: {message}")]
Failure {
id: u32,
name: &'static str,
message: String,
},
#[error("migrations applied out of order: tried {tried} after {previous}")]
OutOfOrder { previous: u32, tried: u32 },
}
pub trait Migration: Send + Sync {
fn id(&self) -> u32;
fn name(&self) -> &'static str;
fn up(&self, tx: &Transaction<'_>) -> Result<(), rusqlite::Error>;
}
fn all_migrations() -> Vec<Box<dyn Migration>> {
vec![
Box::new(m001_memory_commit_log::M001),
Box::new(m002_durable_jobs::M002),
Box::new(m003_hnsw_manifests::M003),
Box::new(m004_raft_log::M004),
]
}
pub struct MigrationRunner;
impl MigrationRunner {
pub fn run_pending(conn: &mut Connection) -> Result<usize, MigrationError> {
Self::ensure_meta_table(conn).map_err(|e| MigrationError::Sqlite {
migration_id: 0,
migration_name: "_meta_table_setup",
source: e,
})?;
let applied = Self::applied_ids(conn).map_err(|e| MigrationError::Sqlite {
migration_id: 0,
migration_name: "_applied_ids_query",
source: e,
})?;
let migrations = all_migrations();
for migration in &migrations {
let _ = migration.id();
}
let mut applied_count = 0;
let mut last_applied_id = applied.iter().copied().max().unwrap_or(0);
for migration in migrations {
let id = migration.id();
if applied.contains(&id) {
continue;
}
if id <= last_applied_id {
return Err(MigrationError::OutOfOrder {
previous: last_applied_id,
tried: id,
});
}
let name = migration.name();
tracing::info!(
migration_id = id,
migration_name = name,
"applying migration"
);
let tx = conn.transaction().map_err(|e| MigrationError::Sqlite {
migration_id: id,
migration_name: name,
source: e,
})?;
migration.up(&tx).map_err(|e| MigrationError::Sqlite {
migration_id: id,
migration_name: name,
source: e,
})?;
tx.execute(
"INSERT INTO _yantrikdb_meta_migrations (id, name, applied_at_unix_micros) VALUES (?1, ?2, ?3)",
rusqlite::params![
id,
name,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0),
],
)
.map_err(|e| MigrationError::Sqlite {
migration_id: id,
migration_name: name,
source: e,
})?;
tx.commit().map_err(|e| MigrationError::Sqlite {
migration_id: id,
migration_name: name,
source: e,
})?;
applied_count += 1;
last_applied_id = id;
tracing::info!(
migration_id = id,
migration_name = name,
"migration applied"
);
}
Ok(applied_count)
}
fn ensure_meta_table(conn: &Connection) -> Result<(), rusqlite::Error> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS _yantrikdb_meta_migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at_unix_micros INTEGER NOT NULL
) STRICT;",
)
}
fn applied_ids(conn: &Connection) -> Result<std::collections::HashSet<u32>, rusqlite::Error> {
let mut stmt = conn.prepare("SELECT id FROM _yantrikdb_meta_migrations")?;
let ids: rusqlite::Result<Vec<u32>> =
stmt.query_map([], |row| row.get::<_, u32>(0))?.collect();
Ok(ids?.into_iter().collect())
}
pub fn applied_summary(conn: &Connection) -> Result<Vec<(u32, String)>, rusqlite::Error> {
Self::ensure_meta_table(conn)?;
let mut stmt =
conn.prepare("SELECT id, name FROM _yantrikdb_meta_migrations ORDER BY id ASC")?;
let rows: rusqlite::Result<Vec<(u32, String)>> = stmt
.query_map([], |row| {
Ok((row.get::<_, u32>(0)?, row.get::<_, String>(1)?))
})?
.collect();
rows
}
}
#[cfg(test)]
mod tests {
use super::*;
fn open_in_memory() -> Connection {
Connection::open_in_memory().expect("in-memory sqlite")
}
#[test]
fn runs_pending_migrations_on_fresh_db() {
let mut conn = open_in_memory();
let n = MigrationRunner::run_pending(&mut conn).unwrap();
assert!(n >= 1, "should have applied at least m001");
let count: u32 = conn
.query_row(
"SELECT COUNT(*) FROM _yantrikdb_meta_migrations",
[],
|row| row.get(0),
)
.unwrap();
assert!(count >= 1);
}
#[test]
fn run_pending_is_idempotent() {
let mut conn = open_in_memory();
let first_run = MigrationRunner::run_pending(&mut conn).unwrap();
let second_run = MigrationRunner::run_pending(&mut conn).unwrap();
assert!(first_run >= 1, "first run should apply migrations");
assert_eq!(second_run, 0, "second run should be a no-op");
}
#[test]
fn applied_summary_returns_ordered_list() {
let mut conn = open_in_memory();
MigrationRunner::run_pending(&mut conn).unwrap();
let summary = MigrationRunner::applied_summary(&conn).unwrap();
assert!(!summary.is_empty());
for window in summary.windows(2) {
assert!(
window[0].0 < window[1].0,
"summary not ordered: {summary:?}"
);
}
assert_eq!(summary[0].0, 1);
assert_eq!(summary[0].1, "memory_commit_log");
}
}