use seerdb::DBOptions;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use tempfile::TempDir;
#[test]
fn test_compaction_no_data_loss() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(512 * 1024) .background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
let num_keys = 10000;
for i in 0..num_keys {
let key = format!("key_{:05}", i);
let value = format!("value_{:05}", i);
db.put(key.as_bytes(), value.as_bytes()).unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..num_keys {
let key = format!("key_{:05}", i);
let expected_value = format!("value_{:05}", i);
let value = db.get(key.as_bytes()).unwrap();
assert!(
value.is_some(),
"Key {} should be present after compaction",
key
);
assert_eq!(
value.unwrap().as_ref(),
expected_value.as_bytes(),
"Value for key {} should be correct after compaction",
key
);
}
}
#[test]
fn test_compaction_no_duplicates() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(512 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for round in 0..5 {
for i in 0..1000 {
let key = format!("key_{:04}", i);
let value = format!("value_round{}_key{}", round, i);
db.put(key.as_bytes(), value.as_bytes()).unwrap();
}
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
let mut seen_keys = HashSet::new();
for i in 0..1000 {
let key = format!("key_{:04}", i);
let expected_value = format!("value_round4_key{}", i);
assert!(
seen_keys.insert(key.clone()),
"Key {} should only appear once",
key
);
let value = db.get(key.as_bytes()).unwrap();
assert_eq!(
value.unwrap().as_ref(),
expected_value.as_bytes(),
"Key {} should have latest value after compaction",
key
);
}
}
#[test]
fn test_compaction_preserves_key_ordering() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(512 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
let keys: Vec<_> = (0..5000).map(|i| format!("key_{:05}", i)).collect();
for key in keys.iter().rev() {
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
let mut iter = db.range(b"", Some(b"~")).unwrap();
let mut prev_key: Option<Vec<u8>> = None;
while let Some(Ok((key, _value))) = iter.next() {
if let Some(prev) = prev_key {
assert!(
prev < key.to_vec(),
"Keys should be in sorted order after compaction"
);
}
prev_key = Some(key.to_vec());
}
}
#[test]
fn test_compaction_handles_tombstones() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(512 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for i in 0..1000 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
for i in (0..1000).step_by(2) {
let key = format!("key_{:04}", i);
db.delete(key.as_bytes()).unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..1000 {
let key = format!("key_{:04}", i);
let value = db.get(key.as_bytes()).unwrap();
if i % 2 == 0 {
assert!(
value.is_none(),
"Deleted key {} should not be present after compaction",
key
);
} else {
assert!(
value.is_some(),
"Non-deleted key {} should be present after compaction",
key
);
}
}
}
#[test]
fn test_compaction_updates_supersede_old_values() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(256 * 1024) .background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for i in 0..500 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value_v1").unwrap();
}
db.flush().unwrap();
for i in 0..500 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value_v2").unwrap();
}
db.flush().unwrap();
for i in 0..500 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value_v3").unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..500 {
let key = format!("key_{:04}", i);
let value = db.get(key.as_bytes()).unwrap();
assert_eq!(
value.unwrap().as_ref(),
b"value_v3",
"Key {} should have latest value (v3) after compaction",
key
);
}
}
#[test]
fn test_compaction_across_multiple_levels() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(256 * 1024) .background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for batch in 0..10 {
for i in 0..500 {
let key = format!("batch{}_key{:04}", batch, i);
let value = format!("batch{}_value{}", batch, i);
db.put(key.as_bytes(), value.as_bytes()).unwrap();
}
db.flush().unwrap();
}
thread::sleep(std::time::Duration::from_millis(2000));
for batch in 0..10 {
for i in 0..500 {
let key = format!("batch{}_key{:04}", batch, i);
let expected_value = format!("batch{}_value{}", batch, i);
let value = db.get(key.as_bytes()).unwrap();
assert!(
value.is_some(),
"Key {} should be present after multi-level compaction",
key
);
assert_eq!(value.unwrap().as_ref(), expected_value.as_bytes());
}
}
}
#[test]
fn test_compaction_with_overlapping_key_ranges() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for i in 0..1000 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"range1").unwrap();
}
db.flush().unwrap();
for i in 500..1500 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"range2").unwrap();
}
db.flush().unwrap();
for i in 1000..2000 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"range3").unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..2000 {
let key = format!("key_{:04}", i);
let value = db.get(key.as_bytes()).unwrap();
assert!(value.is_some(), "Key {} should be present", key);
let expected = if i < 500 {
b"range1"
} else if i < 1000 {
b"range2"
} else {
b"range3"
};
assert_eq!(
value.unwrap().as_ref(),
expected,
"Key {} should have correct value after overlapping compaction",
key
);
}
}
#[test]
fn test_compaction_merges_adjacent_sstables() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for batch in 0..5 {
let start = batch * 200;
let end = (batch + 1) * 200;
for i in start..end {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
}
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..1000 {
let key = format!("key_{:04}", i);
assert!(
db.get(key.as_bytes()).unwrap().is_some(),
"Key {} should be present after SSTable merge",
key
);
}
}
#[test]
fn test_compaction_handles_empty_levels() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(512 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for i in 0..1000 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(500));
for i in 0..1000 {
let key = format!("key_{:04}", i);
assert!(db.get(key.as_bytes()).unwrap().is_some());
}
}
#[test]
fn test_compaction_with_single_key_per_level() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap();
for i in 0..10 {
let key = format!("key_{}", i);
db.put(key.as_bytes(), b"value").unwrap();
db.flush().unwrap();
}
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..10 {
let key = format!("key_{}", i);
let result = db.get(key.as_bytes()).unwrap();
if result.is_none() {
eprintln!("MISSING KEY: key_{} - checking all keys...", i);
for j in 0..10 {
let k = format!("key_{}", j);
let r = db.get(k.as_bytes()).unwrap();
eprintln!(
" key_{}: {}",
j,
if r.is_some() { "PRESENT" } else { "MISSING" }
);
}
}
assert!(result.is_some(), "key_{} should be present", i);
}
}
#[test]
#[ignore] fn test_compaction_concurrent_reads() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = Arc::new(
DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap(),
);
for i in 0..5000 {
let key = format!("key_{:05}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
let mut handles = vec![];
for thread_id in 0..4 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for _ in 0..100 {
for i in (0..5000).step_by(50) {
let key = format!("key_{:05}", i);
let value = db_clone.get(key.as_bytes()).unwrap();
assert!(
value.is_some(),
"Thread {} should read key {} during compaction",
thread_id,
key
);
}
}
});
handles.push(handle);
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(500));
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_compaction_concurrent_writes() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = Arc::new(
DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap(),
);
for i in 0..5000 {
let key = format!("pre_key_{:05}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
let mut handles = vec![];
for thread_id in 0..4 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for i in 0..500 {
let key = format!("thread{}_key{:04}", thread_id, i);
db_clone.put(key.as_bytes(), b"value").unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
thread::sleep(std::time::Duration::from_millis(1000));
for thread_id in 0..4 {
for i in 0..500 {
let key = format!("thread{}_key{:04}", thread_id, i);
assert!(db.get(key.as_bytes()).unwrap().is_some());
}
}
}
#[test]
fn test_compaction_concurrent_deletes() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = Arc::new(
DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap(),
);
for i in 0..5000 {
let key = format!("key_{:05}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
let mut handles = vec![];
for thread_id in 0..4 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
let start = thread_id * 1250;
let end = (thread_id + 1) * 1250;
for i in start..end {
let key = format!("key_{:05}", i);
db_clone.delete(key.as_bytes()).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
thread::sleep(std::time::Duration::from_millis(1000));
for i in 0..5000 {
let key = format!("key_{:05}", i);
assert!(db.get(key.as_bytes()).unwrap().is_none());
}
}
#[test]
fn test_compaction_concurrent_flushes() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = Arc::new(
DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap(),
);
for i in 0..5000 {
let key = format!("initial_{:05}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
for batch in 0..5 {
for i in 0..1000 {
let key = format!("batch{}_key{:04}", batch, i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
}
thread::sleep(std::time::Duration::from_millis(2000));
for i in 0..5000 {
let key = format!("initial_{:05}", i);
assert!(db.get(key.as_bytes()).unwrap().is_some());
}
for batch in 0..5 {
for i in 0..1000 {
let key = format!("batch{}_key{:04}", batch, i);
assert!(db.get(key.as_bytes()).unwrap().is_some());
}
}
}
#[test]
#[ignore] fn test_compaction_concurrent_scans() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = Arc::new(
DBOptions::default()
.memtable_capacity(256 * 1024)
.background_flush(true)
.background_compaction(true)
.open(&data_dir)
.unwrap(),
);
for i in 0..5000 {
let key = format!("key_{:05}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
let mut handles = vec![];
for _ in 0..3 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for _ in 0..10 {
let mut count = 0;
let mut iter = db_clone.range(b"key_00", Some(b"key_01")).unwrap();
while let Some(Ok((_key, _value))) = iter.next() {
count += 1;
}
assert!(
count >= 900,
"Scan should find most keys during compaction, got {}",
count
);
}
});
handles.push(handle);
}
db.flush().unwrap();
for handle in handles {
handle.join().unwrap();
}
}