use seerdb::{DBOptions, SyncPolicy, DB};
use std::fs::{self, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
#[test]
fn test_wal_truncated_record_recovery() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.open(&db_path)
.unwrap();
for i in 0..100 {
db.put(format!("key_{:03}", i).as_bytes(), b"value")
.unwrap();
}
}
let wal_path = db_path.join("wal.log");
let original_size = fs::metadata(&wal_path).unwrap().len();
let truncated_size = (original_size * 9) / 10;
{
let file = OpenOptions::new().write(true).open(&wal_path).unwrap();
file.set_len(truncated_size).unwrap();
file.sync_all().unwrap();
}
match DB::open(&db_path) {
Ok(db) => {
let recovered = (0..100)
.filter(|i| {
db.get(format!("key_{:03}", i).as_bytes())
.unwrap()
.is_some()
})
.count();
assert!(
recovered >= 80,
"Should recover most records after truncation, got {} / 100",
recovered
);
for i in 0..100 {
if let Some(value) = db.get(format!("key_{:03}", i).as_bytes()).unwrap() {
assert_eq!(value.as_ref(), b"value", "Value should be intact");
}
}
}
Err(e) => {
println!("Recovery failed (acceptable): {}", e);
}
}
}
#[test]
fn test_wal_record_body_corruption() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.open(&db_path)
.unwrap();
for i in 0..50 {
db.put(format!("key_{:03}", i).as_bytes(), b"value")
.unwrap();
}
}
let wal_path = db_path.join("wal.log");
let file_size = fs::metadata(&wal_path).unwrap().len();
let corrupt_offset = file_size / 2;
{
let mut file = OpenOptions::new().write(true).open(&wal_path).unwrap();
file.seek(SeekFrom::Start(corrupt_offset)).unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
file.sync_all().unwrap();
}
match DB::open(&db_path) {
Ok(db) => {
for i in 0..50 {
if let Some(value) = db.get(format!("key_{:03}", i).as_bytes()).unwrap() {
assert_eq!(
value.as_ref(),
b"value",
"Should not return corrupted value"
);
}
}
}
Err(_) => {
}
}
}
#[test]
fn test_wal_batch_atomicity() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.open(&db_path)
.unwrap();
let mut batch = db.batch();
for i in 0..10 {
batch.put(format!("batch_key_{}", i).as_bytes(), b"batch_value");
}
batch.commit().unwrap();
}
let db = DB::open(&db_path).unwrap();
let present: Vec<bool> = (0..10)
.map(|i| {
db.get(format!("batch_key_{}", i).as_bytes())
.unwrap()
.is_some()
})
.collect();
let all_present = present.iter().all(|&p| p);
let all_absent = present.iter().all(|&p| !p);
assert!(
all_present || all_absent,
"Batch must be atomic: either all keys present or all absent. Got: {:?}",
present
);
assert!(
all_present,
"All batch keys should be present after clean recovery"
);
}
#[test]
fn test_flush_sstable_before_wal_clear() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.open(&db_path)
.unwrap();
for i in 0..1000 {
db.put(format!("key_{:04}", i).as_bytes(), b"value")
.unwrap();
}
db.flush().unwrap();
}
let sstable_exists = fs::read_dir(&db_path)
.unwrap()
.filter_map(|e| e.ok())
.any(|e| e.path().extension().and_then(|s| s.to_str()) == Some("sst"));
assert!(sstable_exists, "SSTable should exist after flush");
let db = DB::open(&db_path).unwrap();
for i in 0..1000 {
assert!(
db.get(format!("key_{:04}", i).as_bytes())
.unwrap()
.is_some(),
"Key {} should exist after flush + reopen",
i
);
}
}
#[test]
fn test_concurrent_flush_sequence_monotonic() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = Arc::new(
DBOptions::default()
.memtable_capacity(1024) .background_compaction(false) .open(&db_path)
.unwrap(),
);
let write_count = Arc::new(AtomicUsize::new(0));
let stop = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
for t in 0..4 {
let db = Arc::clone(&db);
let write_count = Arc::clone(&write_count);
let stop = Arc::clone(&stop);
handles.push(thread::spawn(move || {
let mut i = 0;
while !stop.load(Ordering::Relaxed) {
let key = format!("t{}_{:06}", t, i);
if db.put(key.as_bytes(), b"value").is_ok() {
write_count.fetch_add(1, Ordering::Relaxed);
}
i += 1;
if i > 10000 {
break;
}
}
}));
}
thread::sleep(Duration::from_secs(2));
stop.store(true, Ordering::Relaxed);
for h in handles {
h.join().unwrap();
}
let total_writes = write_count.load(Ordering::Relaxed);
db.flush().unwrap();
drop(db);
let db = DB::open(&db_path).unwrap();
let mut recovered = 0;
for t in 0..4 {
for i in 0..10001 {
if db
.get(format!("t{}_{:06}", t, i).as_bytes())
.unwrap()
.is_some()
{
recovered += 1;
}
}
}
let recovery_rate = (recovered as f64) / (total_writes as f64);
assert!(
recovery_rate > 0.99,
"Should recover >99% of writes, got {:.1}% ({} / {})",
recovery_rate * 100.0,
recovered,
total_writes
);
}
#[test]
fn test_tombstone_shadows_across_levels() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = DBOptions::default()
.memtable_capacity(1024) .background_compaction(true) .open(&db_path)
.unwrap();
db.put(b"shadowed_key", b"original_value").unwrap();
db.flush().unwrap();
db.delete(b"shadowed_key").unwrap();
db.flush().unwrap();
for batch in 0..5 {
for i in 0..50 {
db.put(format!("filler_{}_{:03}", batch, i).as_bytes(), b"filler")
.unwrap();
}
db.flush().unwrap();
}
thread::sleep(Duration::from_millis(500));
assert!(
db.get(b"shadowed_key").unwrap().is_none(),
"Tombstone should shadow original value after compaction"
);
drop(db);
let db = DBOptions::default()
.background_compaction(true)
.open(&db_path)
.unwrap();
assert!(
db.get(b"shadowed_key").unwrap().is_none(),
"Tombstone should persist after reopen"
);
}
#[test]
fn test_compaction_preserves_all_data() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = DBOptions::default()
.memtable_capacity(2048)
.background_compaction(true) .open(&db_path)
.unwrap();
for batch in 0..5 {
for i in 0..100 {
let key = format!("key_{:02}_{:03}", batch, i);
let value = format!("value_{:02}_{:03}", batch, i);
db.put(key.as_bytes(), value.as_bytes()).unwrap();
}
db.flush().unwrap();
}
thread::sleep(Duration::from_millis(500));
for batch in 0..5 {
for i in 0..100 {
let key = format!("key_{:02}_{:03}", batch, i);
let expected_value = format!("value_{:02}_{:03}", batch, i);
let value = db
.get(key.as_bytes())
.unwrap()
.unwrap_or_else(|| panic!("Key {} should exist after compaction", key));
assert_eq!(
value.as_ref(),
expected_value.as_bytes(),
"Value for {} should be preserved",
key
);
}
}
drop(db);
let db = DB::open(&db_path).unwrap();
for batch in 0..5 {
for i in 0..100 {
let key = format!("key_{:02}_{:03}", batch, i);
assert!(
db.get(key.as_bytes()).unwrap().is_some(),
"Key {} should exist after reopen",
key
);
}
}
}
#[test]
fn test_read_during_memtable_swap() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = Arc::new(
DBOptions::default()
.memtable_capacity(4096) .open(&db_path)
.unwrap(),
);
for i in 0..100 {
db.put(format!("pre_{:03}", i).as_bytes(), b"pre_value")
.unwrap();
}
let stop = Arc::new(AtomicBool::new(false));
let error_count = Arc::new(AtomicUsize::new(0));
let db_reader = Arc::clone(&db);
let stop_reader = Arc::clone(&stop);
let error_count_reader = Arc::clone(&error_count);
let reader = thread::spawn(move || {
while !stop_reader.load(Ordering::Relaxed) {
for i in 0..100 {
match db_reader.get(format!("pre_{:03}", i).as_bytes()) {
Ok(Some(value)) => {
if value.as_ref() != b"pre_value" {
error_count_reader.fetch_add(1, Ordering::Relaxed);
}
}
Ok(None) => {
}
Err(_) => {
error_count_reader.fetch_add(1, Ordering::Relaxed);
}
}
}
}
});
let db_writer = Arc::clone(&db);
let stop_writer = Arc::clone(&stop);
let writer = thread::spawn(move || {
let mut i = 0;
while !stop_writer.load(Ordering::Relaxed) && i < 10000 {
let _ = db_writer.put(format!("write_{:06}", i).as_bytes(), b"write_value");
i += 1;
}
});
thread::sleep(Duration::from_secs(2));
stop.store(true, Ordering::Relaxed);
reader.join().unwrap();
writer.join().unwrap();
assert_eq!(
error_count.load(Ordering::Relaxed),
0,
"Should have no read errors during memtable swap"
);
}
#[test]
fn test_concurrent_put_delete_same_key() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = Arc::new(DB::open(&db_path).unwrap());
let iterations = 1000;
let db1 = Arc::clone(&db);
let t1 = thread::spawn(move || {
for _ in 0..iterations {
let _ = db1.put(b"contested_key", b"put_value");
}
});
let db2 = Arc::clone(&db);
let t2 = thread::spawn(move || {
for _ in 0..iterations {
let _ = db2.delete(b"contested_key");
}
});
t1.join().unwrap();
t2.join().unwrap();
match db.get(b"contested_key").unwrap() {
Some(value) => {
assert_eq!(
value.as_ref(),
b"put_value",
"If present, value should be 'put_value'"
);
}
None => {
}
}
db.flush().unwrap();
drop(db);
let db = DB::open(&db_path).unwrap();
match db.get(b"contested_key").unwrap() {
Some(value) => {
assert_eq!(value.as_ref(), b"put_value");
}
None => {}
}
}
#[test]
fn test_empty_value_full_lifecycle() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = DBOptions::default()
.memtable_capacity(2048)
.background_compaction(true) .open(&db_path)
.unwrap();
db.put(b"empty_value_key", b"").unwrap();
let value = db.get(b"empty_value_key").unwrap();
assert!(value.is_some(), "Empty value should be retrievable");
assert_eq!(value.unwrap().as_ref(), b"", "Value should be empty");
db.flush().unwrap();
let value = db.get(b"empty_value_key").unwrap();
assert!(value.is_some(), "Empty value should exist after flush");
assert_eq!(value.unwrap().as_ref(), b"", "Value should still be empty");
for batch in 0..5 {
for i in 0..50 {
db.put(format!("filler_{}_{:03}", batch, i).as_bytes(), b"filler")
.unwrap();
}
db.flush().unwrap();
}
thread::sleep(Duration::from_millis(500));
let value = db.get(b"empty_value_key").unwrap();
assert!(value.is_some(), "Empty value should exist after compaction");
assert_eq!(
value.unwrap().as_ref(),
b"",
"Value should be empty after compaction"
);
drop(db);
let db = DB::open(&db_path).unwrap();
let value = db.get(b"empty_value_key").unwrap();
assert!(value.is_some(), "Empty value should exist after reopen");
assert_eq!(
value.unwrap().as_ref(),
b"",
"Value should be empty after reopen"
);
}
#[test]
fn test_key_with_null_bytes() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = DB::open(&db_path).unwrap();
let key_with_nulls = b"key\x00with\x00nulls";
let value = b"value_for_null_key";
db.put(key_with_nulls, value).unwrap();
let retrieved = db.get(key_with_nulls).unwrap().unwrap();
assert_eq!(retrieved.as_ref(), value);
db.flush().unwrap();
let retrieved = db.get(key_with_nulls).unwrap().unwrap();
assert_eq!(retrieved.as_ref(), value);
drop(db);
let db = DB::open(&db_path).unwrap();
let retrieved = db.get(key_with_nulls).unwrap().unwrap();
assert_eq!(retrieved.as_ref(), value);
}
#[test]
fn test_value_at_vlog_threshold() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let threshold = 1024;
let db = DBOptions::default()
.vlog_threshold(Some(threshold))
.open(&db_path)
.unwrap();
let value_at = vec![b'a'; threshold];
db.put(b"key_at", &value_at).unwrap();
let value_over = vec![b'b'; threshold + 1];
db.put(b"key_over", &value_over).unwrap();
let value_under = vec![b'c'; threshold - 1];
db.put(b"key_under", &value_under).unwrap();
db.flush().unwrap();
assert_eq!(db.get(b"key_at").unwrap().unwrap().as_ref(), &value_at[..]);
assert_eq!(
db.get(b"key_over").unwrap().unwrap().as_ref(),
&value_over[..]
);
assert_eq!(
db.get(b"key_under").unwrap().unwrap().as_ref(),
&value_under[..]
);
drop(db);
let db = DBOptions::default()
.vlog_threshold(Some(threshold))
.open(&db_path)
.unwrap();
assert_eq!(db.get(b"key_at").unwrap().unwrap().as_ref(), &value_at[..]);
assert_eq!(
db.get(b"key_over").unwrap().unwrap().as_ref(),
&value_over[..]
);
assert_eq!(
db.get(b"key_under").unwrap().unwrap().as_ref(),
&value_under[..]
);
}
#[test]
fn test_many_versions_same_key() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = DBOptions::default()
.memtable_capacity(4096)
.background_compaction(true) .open(&db_path)
.unwrap();
for version in 0..100 {
let value = format!("version_{:03}", version);
db.put(b"versioned_key", value.as_bytes()).unwrap();
if version % 25 == 24 {
db.flush().unwrap();
}
}
let value = db.get(b"versioned_key").unwrap().unwrap();
assert_eq!(value.as_ref(), b"version_099", "Should see latest version");
thread::sleep(Duration::from_millis(500));
let value = db.get(b"versioned_key").unwrap().unwrap();
assert_eq!(
value.as_ref(),
b"version_099",
"Latest version should survive compaction"
);
drop(db);
let db = DB::open(&db_path).unwrap();
let value = db.get(b"versioned_key").unwrap().unwrap();
assert_eq!(
value.as_ref(),
b"version_099",
"Latest version should survive reopen"
);
}
#[test]
fn test_snapshot_consistency_during_writes() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().to_path_buf();
let db = Arc::new(DB::open(&db_path).unwrap());
for i in 0..100 {
db.put(format!("key_{:03}", i).as_bytes(), b"v1").unwrap();
}
let snapshot = db.snapshot().unwrap();
for i in 0..100 {
db.put(format!("key_{:03}", i).as_bytes(), b"v2").unwrap();
}
for i in 0..100 {
let value = snapshot
.get(format!("key_{:03}", i).as_bytes())
.unwrap()
.unwrap();
assert_eq!(
value.as_ref(),
b"v1",
"Snapshot should see v1 for key {}",
i
);
}
for i in 0..100 {
let value = db.get(format!("key_{:03}", i).as_bytes()).unwrap().unwrap();
assert_eq!(value.as_ref(), b"v2", "DB should see v2 for key {}", i);
}
}
#[test]
fn test_persistence_100_keys() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.background_compaction(false)
.background_flush(false)
.open(&path)
.unwrap();
for i in 0..100 {
let key = format!("v:{}", i);
let value = vec![i as u8; 128];
db.put(key.as_bytes(), &value).unwrap();
}
db.flush().unwrap();
}
{
let db = DBOptions::default()
.background_compaction(false)
.background_flush(false)
.open(&path)
.unwrap();
let mut missing = Vec::new();
for i in 0..100 {
let key = format!("v:{}", i);
if db.get(key.as_bytes()).unwrap().is_none() {
missing.push(i);
}
}
assert!(
missing.is_empty(),
"Missing {} keys after reopen: {:?}",
missing.len(),
missing
);
}
}
#[test]
fn test_persistence_multiple_prefixes() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
{
let db = DBOptions::default()
.sync_policy(SyncPolicy::SyncAll)
.background_compaction(false)
.background_flush(false)
.open(&path)
.unwrap();
for i in 0..100u64 {
let v_key = format!("v:{}", i);
let v_value = vec![i as u8; 128];
db.put(v_key.as_bytes(), &v_value).unwrap();
let m_key = format!("m:{}", i);
let m_value = format!(r#"{{"index":{}}}"#, i);
db.put(m_key.as_bytes(), m_value.as_bytes()).unwrap();
let i_key = format!("i:item{}", i);
let i_value = i.to_le_bytes();
db.put(i_key.as_bytes(), &i_value).unwrap();
}
db.flush().unwrap();
}
let sst_files: Vec<_> = fs::read_dir(&path)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map_or(false, |ext| ext == "sst"))
.collect();
eprintln!(
"SSTable files after flush: {:?}",
sst_files.iter().map(|e| e.path()).collect::<Vec<_>>()
);
{
let db = DBOptions::default()
.background_compaction(false)
.background_flush(false)
.open(&path)
.unwrap();
let mut missing_v = Vec::new();
let mut missing_m = Vec::new();
let mut missing_i = Vec::new();
let mut scanned_v_keys = Vec::new();
for item in db.range(b"v:", Some(b"v:\xff")).unwrap() {
if let Ok((k, _)) = item {
if k.starts_with(b"v:") {
scanned_v_keys.push(String::from_utf8_lossy(&k).to_string());
}
}
}
eprintln!("Scanned v: keys count: {}", scanned_v_keys.len());
for i in 0..100u64 {
let v_key = format!("v:{}", i);
let result = db.get(v_key.as_bytes()).unwrap();
if result.is_none() {
missing_v.push(i);
}
let m_key = format!("m:{}", i);
if db.get(m_key.as_bytes()).unwrap().is_none() {
missing_m.push(i);
}
let i_key = format!("i:item{}", i);
if db.get(i_key.as_bytes()).unwrap().is_none() {
missing_i.push(i);
}
}
let total_missing = missing_v.len() + missing_m.len() + missing_i.len();
assert!(
total_missing == 0,
"Missing {} keys after reopen:\n v: {:?}\n m: {:?}\n i: {:?}",
total_missing,
missing_v,
missing_m,
missing_i
);
}
}