use anyhow::Result;
use blvm_node::storage::database::{create_database, default_backend, Database, DatabaseBackend};
use std::path::Path;
use std::sync::Arc;
use tracing::warn;
fn parse_backend(s: &str) -> DatabaseBackend {
match s.to_lowercase().as_str() {
"redb" => DatabaseBackend::Redb,
"rocksdb" => DatabaseBackend::RocksDB,
"sled" => DatabaseBackend::Sled,
"tidesdb" => DatabaseBackend::TidesDB,
_ => default_backend(),
}
}
fn supports_dynamic_trees(backend: DatabaseBackend) -> bool {
matches!(backend, DatabaseBackend::Sled | DatabaseBackend::TidesDB)
}
fn best_module_backend() -> DatabaseBackend {
DatabaseBackend::Sled
}
pub fn open_module_db<P: AsRef<Path>>(data_dir: P) -> Result<Arc<dyn Database>> {
let db_path = data_dir.as_ref().join("db");
std::fs::create_dir_all(&db_path)?;
let explicit = std::env::var("MODULE_CONFIG_DATABASE_BACKEND")
.or_else(|_| std::env::var("MODULE_DATABASE_BACKEND"))
.ok();
let backend = explicit
.as_deref()
.map(parse_backend)
.unwrap_or_else(best_module_backend);
if supports_dynamic_trees(backend) {
return create_database(&db_path, backend, None).map(Arc::from);
}
if explicit.is_some() {
warn!(
"MODULE_DATABASE_BACKEND={:?} does not support dynamic open_tree(). \
Module databases require Sled or TidesDB. \
Set MODULE_CONFIG_DATABASE_BACKEND=sled (or tidesdb) to remove this warning. \
Note: Redb only supports pre-declared tables (schema, items); \
RocksDB requires all column families at open time.",
backend
);
} else {
warn!(
"Default backend {:?} does not support dynamic open_tree(); \
auto-selecting best available module backend (sled or tidesdb). \
Set MODULE_CONFIG_DATABASE_BACKEND=sled to suppress this warning.",
backend
);
}
create_database(&db_path, DatabaseBackend::Sled, None)
.or_else(|_| create_database(&db_path, DatabaseBackend::TidesDB, None))
.map(Arc::from)
.map_err(|_| {
anyhow::anyhow!(
"No module-compatible database backend available. \
Backend {:?} requires pre-declared trees/column families. \
Recompile with the 'sled' or 'tidesdb' feature on blvm-node, \
or set MODULE_CONFIG_DATABASE_BACKEND=sled.",
backend
)
})
}
const SCHEMA_VERSION_KEY: &[u8] = b"schema_version";
#[derive(Clone)]
pub struct MigrationContext {
tree: Arc<dyn blvm_node::storage::database::Tree>,
db: Arc<dyn Database>,
}
impl MigrationContext {
pub fn new(tree: Arc<dyn blvm_node::storage::database::Tree>, db: Arc<dyn Database>) -> Self {
Self { tree, db }
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.tree.insert(key, value)
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.tree.get(key)
}
pub fn delete(&self, key: &[u8]) -> Result<()> {
self.tree.remove(key)
}
pub fn open_tree(&self, name: &str) -> Result<Box<dyn blvm_node::storage::database::Tree>> {
self.db.open_tree(name)
}
}
pub type MigrationUp = fn(&MigrationContext) -> Result<()>;
pub type MigrationDown = fn(&MigrationContext) -> Result<()>;
pub type Migration = (u32, MigrationUp, Option<MigrationDown>);
pub fn run_migrations(db: &Arc<dyn Database>, migrations: &[(u32, MigrationUp)]) -> Result<()> {
run_migrations_with_down(
db,
&migrations
.iter()
.map(|(v, u)| (*v, *u, None))
.collect::<Vec<_>>(),
)
}
pub fn run_migrations_with_down(db: &Arc<dyn Database>, migrations: &[Migration]) -> Result<()> {
let tree = db.open_tree("schema")?;
let tree = Arc::from(tree);
let ctx = MigrationContext::new(tree, Arc::clone(db));
let current: u32 = ctx
.get(SCHEMA_VERSION_KEY)?
.and_then(|v| String::from_utf8(v).ok())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let mut pending: Vec<_> = migrations
.iter()
.filter(|(v, _, _)| *v > current)
.copied()
.collect();
pending.sort_by_key(|(v, _, _)| *v);
for (version, up, _down) in pending {
up(&ctx)?;
ctx.put(SCHEMA_VERSION_KEY, version.to_string().as_bytes())?;
}
Ok(())
}
pub fn run_migrations_down(
db: &Arc<dyn Database>,
migrations: &[Migration],
target_version: u32,
) -> Result<()> {
let tree = db.open_tree("schema")?;
let tree = Arc::from(tree);
let ctx = MigrationContext::new(tree, Arc::clone(db));
let current: u32 = ctx
.get(SCHEMA_VERSION_KEY)?
.and_then(|v| String::from_utf8(v).ok())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if current <= target_version {
return Ok(());
}
let mut to_rollback: Vec<_> = migrations
.iter()
.filter(|(v, _, d)| *v > target_version && *v <= current && d.is_some())
.copied()
.collect();
to_rollback.sort_by_key(|(v, _, _)| std::cmp::Reverse(*v));
for (version, _up, down) in to_rollback {
if let Some(down_fn) = down {
down_fn(&ctx)?;
} else {
anyhow::bail!("Migration version {} has no down function", version);
}
}
ctx.put(SCHEMA_VERSION_KEY, target_version.to_string().as_bytes())?;
Ok(())
}