use crate::cf_store::RocksDbCFStore; use crate::config::RocksDbCFStoreConfig; use crate::error::{StoreError, StoreResult};
use crate::CFOperations;
use log::{error, info, warn};
use rocksdb::{checkpoint::Checkpoint, IteratorMode, ReadOptions, DB}; use std::path::Path;
pub fn migrate_db(
src_config: RocksDbCFStoreConfig,
dst_config: RocksDbCFStoreConfig,
validate: bool,
) -> StoreResult<()> {
info!(
"Starting CF-aware migration from '{}' to '{}'",
src_config.path, dst_config.path
);
let src_store = RocksDbCFStore::open(src_config.clone())?; let dst_store = RocksDbCFStore::open(dst_config.clone())?;
let temp_db_opts_for_list_cf = {
let mut opts = rocksdb::Options::default();
if let Some(p) = src_config.parallelism {
opts.increase_parallelism(p);
}
opts
};
let src_cf_names = DB::list_cf(&temp_db_opts_for_list_cf, &src_config.path)
.map_err(|e| StoreError::RocksDb(e))?
.into_iter()
.collect::<Vec<String>>();
info!("Source DB at '{}' contains CFs: {:?}", src_config.path, src_cf_names);
for cf_name in &src_cf_names {
if !dst_config.column_families_to_open.contains(cf_name) {
warn!(
"Source CF '{}' is not in destination config's 'column_families_to_open'. Skipping migration for this CF.",
cf_name
);
continue;
}
info!("Migrating Column Family: '{}'", cf_name);
let mut migrated_records_count_cf = 0;
let db_raw = src_store.db_raw();
let raw_kvs_iterator = {
let read_opts = ReadOptions::default();
if cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
db_raw.iterator_opt(IteratorMode::Start, read_opts)
} else {
let handle = src_store.get_cf_handle(cf_name)?;
db_raw
.iterator_cf_opt(&handle, read_opts, IteratorMode::Start)
}
};
for result in raw_kvs_iterator {
match result {
Ok((key_bytes, value_bytes)) => {
dst_store.put_raw(cf_name, key_bytes.as_ref(), value_bytes.as_ref())?;
migrated_records_count_cf += 1;
}
Err(e) => {
error!("Migration failed during iteration of CF '{}': {}", cf_name, e);
return Err(StoreError::RocksDb(e));
}
}
}
info!("Migrated {} records for CF '{}'", migrated_records_count_cf, cf_name);
}
if validate {
info!("Starting CF-aware validation...");
for cf_name in &src_cf_names {
if !dst_config.column_families_to_open.contains(cf_name) {
warn!(
"Skipping validation for source CF '{}' as it's not in destination config.",
cf_name
);
continue;
}
info!("Validating Column Family: '{}'", cf_name);
let mut validated_records_count_cf = 0;
let db_raw = src_store.db_raw();
let src_iter = {
let read_opts = ReadOptions::default();
if cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
db_raw.iterator_opt(IteratorMode::Start, read_opts)
} else {
let handle = src_store.get_cf_handle(cf_name)?;
db_raw
.iterator_cf_opt(&handle, read_opts, IteratorMode::Start)
}
};
for result in src_iter {
match result {
Ok((key_bytes, src_value_bytes)) => {
match dst_store.get_raw(cf_name, key_bytes.as_ref())? {
Some(dst_value_bytes) => {
if src_value_bytes.as_ref() != dst_value_bytes.as_slice() {
error!("Data inconsistency for key {:?} in CF '{}'", key_bytes, cf_name);
return Err(StoreError::Other(format!(
"Data inconsistency for key {:?} in CF '{}'",
key_bytes, cf_name
)));
}
}
None => {
error!("Migrated key {:?} missing in destination CF '{}'", key_bytes, cf_name);
return Err(StoreError::Other(format!(
"Migrated key {:?} missing in CF '{}'",
key_bytes, cf_name
)));
}
}
validated_records_count_cf += 1;
}
Err(e) => {
error!("Validation failed during source iteration for CF '{}': {}", cf_name, e);
return Err(StoreError::RocksDb(e));
}
}
}
info!(
"Validated {} records successfully for CF '{}'",
validated_records_count_cf, cf_name
);
}
}
info!("CF-aware migration completed successfully.");
Ok(())
}
pub fn backup_db(backup_path: &Path, cfg_to_open_db: RocksDbCFStoreConfig) -> StoreResult<()> {
info!(
"Starting backup of DB at '{}' to checkpoint directory '{}'",
cfg_to_open_db.path,
backup_path.display()
);
let store = RocksDbCFStore::open(cfg_to_open_db)?;
let db_raw = store.db_raw();
let checkpoint = Checkpoint::new(&db_raw) .map_err(StoreError::RocksDb)?;
if !backup_path.exists() {
std::fs::create_dir_all(backup_path).map_err(|e| StoreError::Io(e))?;
info!("Created backup checkpoint directory: {}", backup_path.display());
} else if !backup_path.is_dir() {
return Err(StoreError::InvalidConfiguration(format!(
"Backup path '{}' exists but is not a directory.",
backup_path.display()
)));
}
checkpoint.create_checkpoint(backup_path).map_err(StoreError::RocksDb)?;
info!("Checkpoint created successfully at '{}'", backup_path.display());
Ok(())
}