mod util;
use pretty_assertions::assert_eq;
use std::path::Path;
use rust_rocksdb::checkpoint::Checkpoint;
use rust_rocksdb::{
DB, DBWithThreadMode, ExportImportFilesMetaData, ImportColumnFamilyOptions, IteratorMode,
MultiThreaded, OptimisticTransactionDB, Options,
};
use std::fs;
use util::DBPath;
#[test]
pub fn test_single_checkpoint() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_single_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db1"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = DBPath::new(&format!("{PATH_PREFIX}cp1"));
cp1.create_checkpoint(&cp1_path).unwrap();
let cp = DB::open_default(&cp1_path).unwrap();
assert_eq!(cp.get(b"k1").unwrap().unwrap(), b"v1");
assert_eq!(cp.get(b"k2").unwrap().unwrap(), b"v2");
assert_eq!(cp.get(b"k3").unwrap().unwrap(), b"v3");
assert_eq!(cp.get(b"k4").unwrap().unwrap(), b"v4");
}
#[test]
pub fn test_multi_checkpoints() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_multi_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db1"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = DBPath::new(&format!("{PATH_PREFIX}cp1"));
cp1.create_checkpoint(&cp1_path).unwrap();
let cp = DB::open_default(&cp1_path).unwrap();
assert_eq!(cp.get(b"k1").unwrap().unwrap(), b"v1");
assert_eq!(cp.get(b"k2").unwrap().unwrap(), b"v2");
assert_eq!(cp.get(b"k3").unwrap().unwrap(), b"v3");
assert_eq!(cp.get(b"k4").unwrap().unwrap(), b"v4");
db.put(b"k1", b"modified").unwrap();
db.put(b"k2", b"changed").unwrap();
db.put(b"k5", b"v5").unwrap();
db.put(b"k6", b"v6").unwrap();
let cp2 = Checkpoint::new(&db).unwrap();
let cp2_path = DBPath::new(&format!("{PATH_PREFIX}cp2"));
cp2.create_checkpoint(&cp2_path).unwrap();
let cp = DB::open_default(&cp2_path).unwrap();
assert_eq!(cp.get(b"k1").unwrap().unwrap(), b"modified");
assert_eq!(cp.get(b"k2").unwrap().unwrap(), b"changed");
assert_eq!(cp.get(b"k5").unwrap().unwrap(), b"v5");
assert_eq!(cp.get(b"k6").unwrap().unwrap(), b"v6");
}
#[test]
pub fn test_checkpoint_with_log_size_zero_forces_flush() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_log_size_zero_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"flushed_key", b"flushed_value").unwrap();
db.flush().unwrap();
db.put(b"memtable_key", b"memtable_value").unwrap();
let cp = Checkpoint::new(&db).unwrap();
let cp_path = DBPath::new(&format!("{PATH_PREFIX}cp"));
cp.create_checkpoint_with_log_size(&cp_path, 0).unwrap();
let wal_files: Vec<_> = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
.collect();
assert_eq!(
wal_files.len(),
1,
"Checkpoint should contain exactly one WAL file"
);
let wal_metadata = wal_files[0].metadata().unwrap();
assert_eq!(
wal_metadata.len(),
0,
"WAL file should be empty when flush is forced"
);
let cp_db = DB::open_default(&cp_path).unwrap();
assert_eq!(
cp_db.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert_eq!(
cp_db.get(b"memtable_key").unwrap().unwrap(),
b"memtable_value"
);
}
#[test]
pub fn test_checkpoint_with_large_log_size_skips_flush() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_log_size_large_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"flushed_key", b"flushed_value").unwrap();
db.flush().unwrap();
db.put(b"memtable_key", b"memtable_value").unwrap();
let cp = Checkpoint::new(&db).unwrap();
let cp_path = DBPath::new(&format!("{PATH_PREFIX}cp"));
let large_log_size = u64::MAX;
cp.create_checkpoint_with_log_size(&cp_path, large_log_size)
.unwrap();
let wal_files: Vec<_> = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
.collect();
assert_eq!(
wal_files.len(),
1,
"Checkpoint should contain exactly one WAL file"
);
let wal_metadata = wal_files[0].metadata().unwrap();
assert!(wal_metadata.len() > 0, "WAL file should not be empty");
let cp_db = DB::open_default(&cp_path).unwrap();
assert_eq!(
cp_db.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert_eq!(
cp_db.get(b"memtable_key").unwrap().unwrap(),
b"memtable_value"
);
}
#[test]
pub fn test_optimistic_transaction_db_checkpoint_with_log_size_zero_forces_flush() {
const PATH_PREFIX: &str = "_rust_rocksdb_otxn_cp_log_size_zero_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db: OptimisticTransactionDB = OptimisticTransactionDB::open(&opts, &db_path).unwrap();
db.put(b"flushed_key", b"flushed_value").unwrap();
db.flush().unwrap();
db.put(b"memtable_key", b"memtable_value").unwrap();
let cp = Checkpoint::new(&db).unwrap();
let cp_path = DBPath::new(&format!("{PATH_PREFIX}cp"));
cp.create_checkpoint_with_log_size(&cp_path, 0).unwrap();
let wal_files: Vec<_> = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
.collect();
assert_eq!(
wal_files.len(),
1,
"Checkpoint should contain exactly one WAL file"
);
let wal_metadata = wal_files[0].metadata().unwrap();
assert_eq!(
wal_metadata.len(),
0,
"WAL file should be empty when flush is forced"
);
let cp_db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&cp_path).unwrap();
assert_eq!(
cp_db.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert_eq!(
cp_db.get(b"memtable_key").unwrap().unwrap(),
b"memtable_value"
);
}
#[test]
pub fn test_optimistic_transaction_db_checkpoint_with_large_log_size_skips_flush() {
const PATH_PREFIX: &str = "_rust_rocksdb_otxn_cp_log_size_large_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db: OptimisticTransactionDB = OptimisticTransactionDB::open(&opts, &db_path).unwrap();
db.put(b"flushed_key", b"flushed_value").unwrap();
db.flush().unwrap();
db.put(b"memtable_key", b"memtable_value").unwrap();
let cp = Checkpoint::new(&db).unwrap();
let cp_path = DBPath::new(&format!("{PATH_PREFIX}cp"));
let large_log_size = u64::MAX;
cp.create_checkpoint_with_log_size(&cp_path, large_log_size)
.unwrap();
let wal_files: Vec<_> = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
.collect();
assert_eq!(
wal_files.len(),
1,
"Checkpoint should contain exactly one WAL file"
);
let wal_metadata = wal_files[0].metadata().unwrap();
assert!(wal_metadata.len() > 0, "WAL file should not be empty");
let cp_db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&cp_path).unwrap();
assert_eq!(
cp_db.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert_eq!(
cp_db.get(b"memtable_key").unwrap().unwrap(),
b"memtable_value"
);
}
#[test]
pub fn test_checkpoint_wal_truncation_loses_memtable_data() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_wal_truncate_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"flushed_key", b"flushed_value").unwrap();
db.flush().unwrap();
db.put(b"memtable_key", b"memtable_value").unwrap();
let cp = Checkpoint::new(&db).unwrap();
let large_log_size = u64::MAX;
let cp_intact_path = DBPath::new(&format!("{PATH_PREFIX}cp_intact"));
cp.create_checkpoint_with_log_size(&cp_intact_path, large_log_size)
.unwrap();
let cp_truncated_path = DBPath::new(&format!("{PATH_PREFIX}cp_truncated"));
cp.create_checkpoint_with_log_size(&cp_truncated_path, large_log_size)
.unwrap();
let wal_files: Vec<_> = fs::read_dir((&cp_truncated_path).as_ref())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
.map(|entry| entry.path())
.collect();
for wal_file in &wal_files {
fs::write(wal_file, b"").unwrap();
}
let cp_db_intact = DB::open_default(&cp_intact_path).unwrap();
assert_eq!(
cp_db_intact.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert_eq!(
cp_db_intact.get(b"memtable_key").unwrap().unwrap(),
b"memtable_value",
"memtable_key should be present when WAL is intact"
);
let cp_db_truncated = DB::open_default(&cp_truncated_path).unwrap();
assert_eq!(
cp_db_truncated.get(b"flushed_key").unwrap().unwrap(),
b"flushed_value"
);
assert!(
cp_db_truncated.get(b"memtable_key").unwrap().is_none(),
"memtable_key should be absent when WAL is truncated"
);
}
#[test]
#[ignore]
fn test_checkpoint_wal_over_threshold_is_flushed() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_wal_threshold_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
let threshold = 50 * 1024 * 1024_u64; let value = vec![b'x'; 1024]; let mut i = 0;
loop {
let key = format!("key_{:08}", i);
db.put(key.as_bytes(), &value).unwrap();
i += 1;
if i % 1000 == 0 {
let wal_size: u64 = fs::read_dir((&db_path).as_ref())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
.map(|e| e.metadata().unwrap().len())
.sum();
if wal_size > threshold {
break;
}
}
}
let cp = Checkpoint::new(&db).unwrap();
let cp_path = DBPath::new(&format!("{PATH_PREFIX}cp"));
cp.create_checkpoint_with_log_size(&cp_path, threshold)
.unwrap();
let cp_wal_size: u64 = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
.map(|e| e.metadata().unwrap().len())
.sum();
assert_eq!(
cp_wal_size, 0,
"Checkpoint WAL should be empty when WAL size exceeds log_size_for_flush threshold"
);
let cp_sst_count = fs::read_dir((&cp_path).as_ref())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "sst"))
.count();
assert!(
cp_sst_count > 0,
"Checkpoint should contain SST files when flush is triggered"
);
let cp_db = DB::open_default(&cp_path).unwrap();
assert!(cp_db.get(b"key_00000000").unwrap().is_some());
}
#[test]
pub fn test_export_checkpoint_column_family() {
const PATH_PREFIX: &str = "_rust_rocksdb_cf_export_";
let db_path = DBPath::new(&format!("{PATH_PREFIX}db-src"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open(&opts, &db_path).unwrap();
let opts = Options::default();
db.create_cf("cf1", &opts).unwrap();
db.create_cf("cf2", &opts).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();
db.put_cf(&cf2, b"k1", b"v1_cf2").unwrap();
db.put_cf(&cf2, b"k2", b"v2_cf2").unwrap();
let cp = Checkpoint::new(&db).unwrap();
db.flush_cf(&cf1).expect("flush succeeds"); db.delete_cf(&cf1, b"k2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
let cf1_export_path = DBPath::new(&format!("{PATH_PREFIX}cf1-export"));
let export_metadata = cp.export_column_family(&cf1, &cf1_export_path).unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.delete_cf(&cf1, b"k1").unwrap();
let db_path = DBPath::new(&format!("{PATH_PREFIX}db-dest"));
let mut opts = Options::default();
opts.create_if_missing(true);
let db_new = DBWithThreadMode::<MultiThreaded>::open(&opts, &db_path).unwrap();
{
db_new.create_cf("cf0", &opts).unwrap();
let cf0 = db_new.cf_handle("cf0").unwrap();
db_new.put_cf(&cf0, b"k1", b"v0").unwrap();
db_new.put_cf(&cf0, b"k5", b"v5").unwrap();
}
let export_files = export_metadata.get_files();
assert_eq!(export_files.len(), 2);
export_files.iter().for_each(|export_file| {
assert!(export_file.column_family_name.is_empty()); assert!(!export_file.name.is_empty());
assert!(!export_file.directory.is_empty());
});
let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(&export_metadata.get_db_comparator_name());
import_metadata.set_files(&export_files.to_vec()).unwrap();
let cf_opts = Options::default();
let mut import_opts = ImportColumnFamilyOptions::default();
import_opts.set_move_files(true);
db_new
.create_column_family_with_import(&cf_opts, "cf1-new", &import_opts, &import_metadata)
.unwrap();
assert!(export_files.iter().all(|export_file| {
!Path::new(&export_file.directory)
.join(&export_file.name)
.exists()
}));
let cf1_new = db_new.cf_handle("cf1-new").unwrap();
let imported_data: Vec<_> = db_new
.iterator_cf(&cf1_new, IteratorMode::Start)
.map(Result::unwrap)
.map(|(k, v)| {
(
String::from_utf8_lossy(&k).into_owned(),
String::from_utf8_lossy(&v).into_owned(),
)
})
.collect();
assert_eq!(
vec![
("k1".to_string(), "v1".to_string()),
("k3".to_string(), "v3".to_string()),
],
imported_data,
);
let cf0 = db_new.cf_handle("cf0").unwrap();
let original_data: Vec<_> = db_new
.iterator_cf(&cf0, IteratorMode::Start)
.map(Result::unwrap)
.map(|(k, v)| {
(
String::from_utf8_lossy(&k).into_owned(),
String::from_utf8_lossy(&v).into_owned(),
)
})
.collect();
assert_eq!(
vec![
("k1".to_string(), "v0".to_string()),
("k5".to_string(), "v5".to_string()),
],
original_data,
);
}