use tracing::info;
use crate::error::{ClusterError, Result};
use super::core::{read_format_version, write_format_version};
use super::schema::CATALOG_FORMAT_VERSION;
pub(super) fn migrate_if_needed(db: &redb::Database) -> Result<()> {
let current = read_format_version(db)?;
match current {
None => {
info!(
format_version = CATALOG_FORMAT_VERSION,
"stamping catalog with current format version"
);
write_format_version(db, CATALOG_FORMAT_VERSION)?;
Ok(())
}
Some(v) if v == CATALOG_FORMAT_VERSION => {
Ok(())
}
Some(v) if v > CATALOG_FORMAT_VERSION => {
Err(ClusterError::Transport {
detail: format!(
"catalog format version {v} is newer than supported ({CATALOG_FORMAT_VERSION}); refusing to open to avoid data corruption"
),
})
}
Some(v) => {
Err(ClusterError::Codec {
detail: format!(
"catalog format version {v} is older than supported ({CATALOG_FORMAT_VERSION}) and no migration arm is defined"
),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_db() -> (tempfile::TempDir, redb::Database) {
use super::super::schema::{GHOST_TABLE, METADATA_TABLE, ROUTING_TABLE, TOPOLOGY_TABLE};
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cluster.redb");
let db = redb::Database::create(&path).unwrap();
let txn = db.begin_write().unwrap();
{
let _ = txn.open_table(TOPOLOGY_TABLE).unwrap();
let _ = txn.open_table(ROUTING_TABLE).unwrap();
let _ = txn.open_table(METADATA_TABLE).unwrap();
let _ = txn.open_table(GHOST_TABLE).unwrap();
}
txn.commit().unwrap();
(dir, db)
}
#[test]
fn fresh_catalog_gets_stamped() {
let (_dir, db) = temp_db();
assert!(read_format_version(&db).unwrap().is_none());
migrate_if_needed(&db).unwrap();
assert_eq!(
read_format_version(&db).unwrap(),
Some(CATALOG_FORMAT_VERSION)
);
}
#[test]
fn reopening_stamped_catalog_is_a_noop() {
let (_dir, db) = temp_db();
migrate_if_needed(&db).unwrap();
migrate_if_needed(&db).unwrap();
migrate_if_needed(&db).unwrap();
assert_eq!(
read_format_version(&db).unwrap(),
Some(CATALOG_FORMAT_VERSION)
);
}
#[test]
fn future_version_is_refused() {
let (_dir, db) = temp_db();
write_format_version(&db, CATALOG_FORMAT_VERSION + 1).unwrap();
let err = migrate_if_needed(&db).unwrap_err().to_string();
assert!(
err.contains("newer than supported"),
"expected future-version rejection, got: {err}"
);
}
#[test]
fn older_version_without_migration_is_refused() {
let (_dir, db) = temp_db();
write_format_version(&db, CATALOG_FORMAT_VERSION - 1).unwrap();
let err = migrate_if_needed(&db).unwrap_err().to_string();
assert!(
err.contains("older than supported"),
"expected older-without-arm rejection, got: {err}"
);
}
}