use anyhow::Result;
use serde::{Deserialize, Serialize};
pub mod file_stamp;
pub mod redb_stamp;
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
)]
#[serde(transparent)]
pub struct SchemaVersion(pub u32);
impl SchemaVersion {
pub const UNVERSIONED: Self = Self(0);
#[must_use]
pub const fn next(self) -> Self {
Self(self.0.saturating_add(1))
}
}
impl std::fmt::Display for SchemaVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "v{}", self.0)
}
}
pub trait Migration<S>: Send + Sync {
#[allow(clippy::wrong_self_convention)]
fn from_version(&self) -> SchemaVersion;
fn label(&self) -> &'static str;
fn apply(&self, store: &S) -> Result<()>;
}
pub struct MigrationRunner<S> {
steps: Vec<Box<dyn Migration<S>>>,
}
impl<S> MigrationRunner<S> {
pub fn new(mut steps: Vec<Box<dyn Migration<S>>>) -> Self {
steps.sort_by_key(|s| s.from_version());
Self { steps }
}
pub fn target_version(&self) -> SchemaVersion {
self.steps
.last()
.map(|s| s.from_version().next())
.unwrap_or(SchemaVersion::UNVERSIONED)
}
pub fn run(
&self,
store: &S,
current: SchemaVersion,
write_stamp: impl Fn(SchemaVersion) -> Result<()>,
) -> Result<SchemaVersion> {
let mut reached = current;
for step in &self.steps {
if step.from_version() < reached {
tracing::debug!(
"migrations: skipping '{}' ({} < {})",
step.label(),
step.from_version(),
reached
);
continue;
}
tracing::info!(
"migrations: applying '{}' ({} → {})",
step.label(),
step.from_version(),
step.from_version().next()
);
step.apply(store)?;
let next = step.from_version().next();
write_stamp(next)?;
reached = next;
}
Ok(reached)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
#[derive(Default)]
struct TestStore {
step_0_ran: AtomicBool,
step_1_ran: AtomicBool,
step_2_ran: AtomicBool,
last_stamp: AtomicU32,
}
impl TestStore {
fn stamp(&self) -> SchemaVersion {
SchemaVersion(self.last_stamp.load(Ordering::SeqCst))
}
fn write_stamp(&self) -> impl Fn(SchemaVersion) -> Result<()> + '_ {
move |v: SchemaVersion| {
self.last_stamp.store(v.0, Ordering::SeqCst);
Ok(())
}
}
}
struct Step0;
impl Migration<TestStore> for Step0 {
fn from_version(&self) -> SchemaVersion {
SchemaVersion::UNVERSIONED
}
fn label(&self) -> &'static str {
"step 0 → 1"
}
fn apply(&self, store: &TestStore) -> Result<()> {
store.step_0_ran.store(true, Ordering::SeqCst);
Ok(())
}
}
struct Step1;
impl Migration<TestStore> for Step1 {
fn from_version(&self) -> SchemaVersion {
SchemaVersion(1)
}
fn label(&self) -> &'static str {
"step 1 → 2"
}
fn apply(&self, store: &TestStore) -> Result<()> {
store.step_1_ran.store(true, Ordering::SeqCst);
Ok(())
}
}
struct FailingStep2;
impl Migration<TestStore> for FailingStep2 {
fn from_version(&self) -> SchemaVersion {
SchemaVersion(2)
}
fn label(&self) -> &'static str {
"step 2 → 3 (failing)"
}
fn apply(&self, store: &TestStore) -> Result<()> {
store.step_2_ran.store(true, Ordering::SeqCst);
Err(anyhow::anyhow!("simulated migration failure"))
}
}
#[test]
fn schema_version_ordering() {
assert!(SchemaVersion::UNVERSIONED < SchemaVersion(1));
assert!(SchemaVersion(1) < SchemaVersion(2));
assert_eq!(SchemaVersion::UNVERSIONED, SchemaVersion(0));
assert_eq!(SchemaVersion(5).next(), SchemaVersion(6));
assert_eq!(SchemaVersion(u32::MAX).next(), SchemaVersion(u32::MAX));
}
#[test]
fn runner_applies_pending_steps() {
let store = TestStore::default();
let runner: MigrationRunner<TestStore> =
MigrationRunner::new(vec![Box::new(Step0), Box::new(Step1)]);
let reached = runner
.run(&store, SchemaVersion::UNVERSIONED, store.write_stamp())
.expect("two-step migration succeeds");
assert!(store.step_0_ran.load(Ordering::SeqCst));
assert!(store.step_1_ran.load(Ordering::SeqCst));
assert_eq!(reached, SchemaVersion(2));
assert_eq!(store.stamp(), SchemaVersion(2));
assert_eq!(runner.target_version(), SchemaVersion(2));
}
#[test]
fn runner_skips_already_applied() {
let store = TestStore::default();
let runner: MigrationRunner<TestStore> =
MigrationRunner::new(vec![Box::new(Step0), Box::new(Step1)]);
let reached = runner
.run(&store, SchemaVersion(1), store.write_stamp())
.expect("resume-from-v1 succeeds");
assert!(
!store.step_0_ran.load(Ordering::SeqCst),
"step 0 should be skipped when current >= v1"
);
assert!(store.step_1_ran.load(Ordering::SeqCst));
assert_eq!(reached, SchemaVersion(2));
assert_eq!(store.stamp(), SchemaVersion(2));
}
#[test]
fn runner_is_incremental_on_crash() {
let store = TestStore::default();
let runner: MigrationRunner<TestStore> = MigrationRunner::new(vec![
Box::new(Step0),
Box::new(Step1),
Box::new(FailingStep2),
]);
let err = runner
.run(&store, SchemaVersion::UNVERSIONED, store.write_stamp())
.expect_err("FailingStep2 should fail");
assert!(
err.to_string().contains("simulated migration failure"),
"unexpected error message: {err}"
);
assert!(store.step_0_ran.load(Ordering::SeqCst));
assert!(store.step_1_ran.load(Ordering::SeqCst));
assert!(store.step_2_ran.load(Ordering::SeqCst));
assert_eq!(store.stamp(), SchemaVersion(2));
}
#[test]
fn runner_propagates_write_stamp_failure() {
let store = TestStore::default();
let runner: MigrationRunner<TestStore> = MigrationRunner::new(vec![Box::new(Step0)]);
let err = runner
.run(&store, SchemaVersion::UNVERSIONED, |_v| {
Err(anyhow::anyhow!("stamp write failed"))
})
.expect_err("write_stamp failure should propagate");
assert!(err.to_string().contains("stamp write failed"));
assert!(store.step_0_ran.load(Ordering::SeqCst));
}
#[test]
fn runner_target_version_matches_last_step() {
let runner_empty: MigrationRunner<TestStore> = MigrationRunner::new(vec![]);
assert_eq!(runner_empty.target_version(), SchemaVersion::UNVERSIONED);
let runner_two: MigrationRunner<TestStore> =
MigrationRunner::new(vec![Box::new(Step0), Box::new(Step1)]);
assert_eq!(runner_two.target_version(), SchemaVersion(2));
}
#[test]
fn runner_handles_out_of_order_step_registration() {
let store = TestStore::default();
let runner: MigrationRunner<TestStore> =
MigrationRunner::new(vec![Box::new(Step1), Box::new(Step0)]);
runner
.run(&store, SchemaVersion::UNVERSIONED, store.write_stamp())
.expect("out-of-order registration runs in version order");
assert!(store.step_0_ran.load(Ordering::SeqCst));
assert!(store.step_1_ran.load(Ordering::SeqCst));
assert_eq!(store.stamp(), SchemaVersion(2));
}
}