use std::sync::Arc;
use cesiumdb::{
Db,
DbOptions,
};
use tempfile::TempDir;
#[test]
fn test_no_segment_id_collision_between_flush_and_compaction() {
let temp_dir = TempDir::new().unwrap();
let mut opts = DbOptions::new();
opts.data_dir(temp_dir.path().to_path_buf())
.memtable_size(512 * 1024) .max_memtables(2);
let db = Db::open(opts);
for batch in 0..3 {
for i in 0..1000 {
let key = format!("key_batch{:02}_item{:05}", batch, i);
db.put(key.as_bytes(), b"value_data_here").unwrap();
}
db.sync().unwrap();
}
db.compact().unwrap();
std::thread::sleep(std::time::Duration::from_secs(3));
for batch in 0..3 {
for i in 0..1000 {
let key = format!("key_batch{:02}_item{:05}", batch, i);
let result = db.get(key.as_bytes()).unwrap();
assert!(
result.is_some(),
"Key {} missing - possible segment ID collision corruption",
key
);
assert_eq!(
&result.unwrap()[..],
b"value_data_here",
"Value mismatch for key {} - possible corruption",
key
);
}
}
db.close().unwrap();
}
#[test]
fn test_segment_id_persists_across_restart() {
let temp_dir = TempDir::new().unwrap();
{
let mut opts = DbOptions::new();
opts.data_dir(temp_dir.path().to_path_buf());
let db = Db::open(opts);
db.put(b"key1", b"value1").unwrap();
db.put(b"key2", b"value2").unwrap();
db.put(b"key3", b"value3").unwrap();
db.sync().unwrap();
db.close().unwrap();
}
{
let mut opts = DbOptions::new();
opts.data_dir(temp_dir.path().to_path_buf());
let db = Db::open(opts);
db.put(b"key4", b"value4").unwrap();
db.put(b"key5", b"value5").unwrap();
db.put(b"key6", b"value6").unwrap();
db.sync().unwrap();
assert_eq!(&db.get(b"key1").unwrap().unwrap()[..], b"value1");
assert_eq!(&db.get(b"key2").unwrap().unwrap()[..], b"value2");
assert_eq!(&db.get(b"key3").unwrap().unwrap()[..], b"value3");
assert_eq!(&db.get(b"key4").unwrap().unwrap()[..], b"value4");
assert_eq!(&db.get(b"key5").unwrap().unwrap()[..], b"value5");
assert_eq!(&db.get(b"key6").unwrap().unwrap()[..], b"value6");
db.close().unwrap();
}
}
#[test]
fn test_concurrent_flush_and_compaction_no_collisions() {
let temp_dir = TempDir::new().unwrap();
let mut opts = DbOptions::new();
opts.data_dir(temp_dir.path().to_path_buf())
.memtable_size(256 * 1024) .max_memtables(3);
let db = Db::open(opts);
let mut handles = vec![];
for thread_id in 0..3 {
let db_clone = Arc::clone(&db);
let handle = std::thread::spawn(move || {
for i in 0..500 {
let key = format!("thread{:02}_key{:05}", thread_id, i);
db_clone.put(key.as_bytes(), b"concurrent_value").unwrap();
}
db_clone.sync().unwrap();
});
handles.push(handle);
}
for _ in 0..3 {
std::thread::sleep(std::time::Duration::from_millis(300));
let _ = db.compact();
}
for handle in handles {
handle.join().unwrap();
}
std::thread::sleep(std::time::Duration::from_secs(3));
for thread_id in 0..3 {
for i in 0..500 {
let key = format!("thread{:02}_key{:05}", thread_id, i);
let result = db.get(key.as_bytes()).unwrap();
assert!(
result.is_some(),
"Key {} missing in concurrent scenario",
key
);
assert_eq!(
&result.unwrap()[..],
b"concurrent_value",
"Value corruption for key {}",
key
);
}
}
db.close().unwrap();
}