use seerdb::{DBOptions, DB};
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use tempfile::TempDir;
#[test]
fn test_batch_single_wal_record() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.background_flush(false)
.open(&data_dir)
.unwrap();
let mut batch = db.batch();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.put(b"key3", b"value3");
batch.delete(b"key4");
batch.commit().unwrap();
drop(db);
let db = DB::open(&data_dir).unwrap();
assert_eq!(db.get(b"key1").unwrap().as_deref(), Some(&b"value1"[..]));
assert_eq!(db.get(b"key2").unwrap().as_deref(), Some(&b"value2"[..]));
assert_eq!(db.get(b"key3").unwrap().as_deref(), Some(&b"value3"[..]));
assert_eq!(db.get(b"key4").unwrap(), None);
}
#[test]
fn test_batch_mixed_put_delete() {
let temp_dir = TempDir::new().unwrap();
let db = DB::open(temp_dir.path()).unwrap();
db.put(b"existing1", b"old_value1").unwrap();
db.put(b"existing2", b"old_value2").unwrap();
db.put(b"existing3", b"old_value3").unwrap();
let mut batch = db.batch();
batch.put(b"new_key1", b"new_value1"); batch.put(b"existing1", b"updated_value1"); batch.delete(b"existing2"); batch.put(b"new_key2", b"new_value2"); batch.delete(b"nonexistent");
batch.commit().unwrap();
assert_eq!(
db.get(b"new_key1").unwrap().as_deref(),
Some(&b"new_value1"[..])
);
assert_eq!(
db.get(b"existing1").unwrap().as_deref(),
Some(&b"updated_value1"[..])
);
assert_eq!(db.get(b"existing2").unwrap(), None);
assert_eq!(
db.get(b"new_key2").unwrap().as_deref(),
Some(&b"new_value2"[..])
);
assert_eq!(
db.get(b"existing3").unwrap().as_deref(),
Some(&b"old_value3"[..])
);
}
#[test]
fn test_batch_empty_handling() {
let temp_dir = TempDir::new().unwrap();
let db = DB::open(temp_dir.path()).unwrap();
db.put(b"key1", b"value1").unwrap();
let batch = db.batch();
batch.commit().unwrap();
assert_eq!(db.get(b"key1").unwrap().as_deref(), Some(&b"value1"[..]));
}
#[test]
fn test_batch_large_1000_operations() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
let db = DBOptions::default()
.background_flush(false)
.open(&data_dir)
.unwrap();
let mut batch = db.batch();
for i in 0..1000 {
let key = format!("key_{:04}", i);
let value = format!("value_{:04}", i);
batch.put(key.as_bytes(), value.as_bytes());
}
batch.commit().unwrap();
drop(db);
let db = DB::open(&data_dir).unwrap();
for i in 0..1000 {
let key = format!("key_{:04}", i);
let expected_value = format!("value_{:04}", i);
let value = db.get(key.as_bytes()).unwrap();
assert_eq!(
value.as_deref(),
Some(expected_value.as_bytes()),
"Key {} should be present after batch write",
key
);
}
}
#[test]
fn test_batch_recovery_after_crash() {
let temp_dir = TempDir::new().unwrap();
let data_dir = PathBuf::from(temp_dir.path());
{
let db = DB::open(&data_dir).unwrap();
let mut batch = db.batch();
batch.put(b"batch_key1", b"batch_value1");
batch.put(b"batch_key2", b"batch_value2");
batch.put(b"batch_key3", b"batch_value3");
batch.delete(b"deleted_key");
batch.commit().unwrap();
}
{
let db = DB::open(&data_dir).unwrap();
assert_eq!(
db.get(b"batch_key1").unwrap().as_deref(),
Some(&b"batch_value1"[..])
);
assert_eq!(
db.get(b"batch_key2").unwrap().as_deref(),
Some(&b"batch_value2"[..])
);
assert_eq!(
db.get(b"batch_key3").unwrap().as_deref(),
Some(&b"batch_value3"[..])
);
assert_eq!(db.get(b"deleted_key").unwrap(), None);
}
}
#[test]
fn test_batch_concurrent_multiple_threads() {
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(DB::open(temp_dir.path()).unwrap());
let mut handles = vec![];
for thread_id in 0..10 {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for batch_id in 0..100 {
let mut batch = db_clone.batch();
for key_id in 0..10 {
let key = format!("t{}_b{}_k{}", thread_id, batch_id, key_id);
let value = format!("value_{}", key);
batch.put(key.as_bytes(), value.as_bytes());
}
batch.commit().unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let mut count = 0;
for thread_id in 0..10 {
for batch_id in 0..100 {
for key_id in 0..10 {
let key = format!("t{}_b{}_k{}", thread_id, batch_id, key_id);
let expected_value = format!("value_{}", key);
let value = db.get(key.as_bytes()).unwrap();
assert_eq!(value.as_deref(), Some(expected_value.as_bytes()));
count += 1;
}
}
}
assert_eq!(count, 10000, "All 10,000 keys should be present");
}
#[test]
fn test_batch_interleaved_with_individual_operations() {
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(DB::open(temp_dir.path()).unwrap());
let db1 = Arc::clone(&db);
let handle1 = thread::spawn(move || {
for i in 0..100 {
let mut batch = db1.batch();
batch.put(format!("batch_{}", i).as_bytes(), b"batch_value");
batch.commit().unwrap();
}
});
let db2 = Arc::clone(&db);
let handle2 = thread::spawn(move || {
for i in 0..100 {
db2.put(format!("single_{}", i).as_bytes(), b"single_value")
.unwrap();
}
});
let db3 = Arc::clone(&db);
let handle3 = thread::spawn(move || {
for i in 0..100 {
let _ = db3.get(format!("batch_{}", i).as_bytes());
let _ = db3.get(format!("single_{}", i).as_bytes());
}
});
handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
for i in 0..100 {
assert!(db.get(format!("batch_{}", i).as_bytes()).unwrap().is_some());
assert!(db
.get(format!("single_{}", i).as_bytes())
.unwrap()
.is_some());
}
}
#[test]
fn test_batch_during_flush() {
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(
DBOptions::default()
.memtable_capacity(1024 * 1024)
.background_flush(true)
.open(temp_dir.path())
.unwrap(),
);
for i in 0..10000 {
db.put(format!("pre_flush_{}", i).as_bytes(), &vec![b'x'; 100])
.unwrap();
}
db.flush().unwrap();
let mut batch = db.batch();
for i in 0..100 {
batch.put(format!("during_flush_{}", i).as_bytes(), b"batch_value");
}
batch.commit().unwrap();
for i in 0..100 {
assert!(db
.get(format!("during_flush_{}", i).as_bytes())
.unwrap()
.is_some());
}
}
#[test]
fn test_batch_during_compaction() {
let temp_dir = TempDir::new().unwrap();
let db = Arc::new(
DBOptions::default()
.memtable_capacity(512 * 1024)
.background_flush(true)
.background_compaction(true)
.open(temp_dir.path())
.unwrap(),
);
for i in 0..20000 {
db.put(format!("pre_compact_{:05}", i).as_bytes(), &vec![b'x'; 100])
.unwrap();
}
db.flush().unwrap();
thread::sleep(std::time::Duration::from_millis(100));
let mut batch = db.batch();
for i in 0..100 {
batch.put(format!("during_compact_{}", i).as_bytes(), b"batch_value");
}
batch.commit().unwrap();
for i in 0..100 {
assert!(db
.get(format!("during_compact_{}", i).as_bytes())
.unwrap()
.is_some());
}
}
#[test]
fn test_batch_write_amplification() {
let temp_dir = TempDir::new().unwrap();
let db = DBOptions::default()
.memtable_capacity(1024 * 1024)
.background_flush(false)
.open(temp_dir.path())
.unwrap();
let mut batch = db.batch();
for i in 0..1000 {
batch.put(format!("batch_key_{:04}", i).as_bytes(), b"batch_value");
}
batch.commit().unwrap();
for i in 0..1000 {
assert!(
db.get(format!("batch_key_{:04}", i).as_bytes())
.unwrap()
.is_some(),
"Batch key {} should be present",
i
);
}
drop(db);
let db = DB::open(temp_dir.path()).unwrap();
for i in 0..1000 {
assert!(
db.get(format!("batch_key_{:04}", i).as_bytes())
.unwrap()
.is_some(),
"Batch key {} should persist after reopen",
i
);
}
}
#[test]
fn test_batch_larger_than_memtable() {
let temp_dir = TempDir::new().unwrap();
let db = DBOptions::default()
.memtable_capacity(100 * 1024)
.background_flush(false)
.open(temp_dir.path())
.unwrap();
let mut batch = db.batch();
for i in 0..2000 {
let key = format!("key_{:04}", i);
let value = vec![b'x'; 100]; batch.put(key.as_bytes(), &value);
}
batch.commit().unwrap();
for i in 0..2000 {
let key = format!("key_{:04}", i);
assert!(
db.get(key.as_bytes()).unwrap().is_some(),
"Key {} should be present",
key
);
}
}
#[test]
fn test_batch_duplicate_keys() {
let temp_dir = TempDir::new().unwrap();
let db = DB::open(temp_dir.path()).unwrap();
let mut batch = db.batch();
batch.put(b"duplicate_key", b"value1");
batch.put(b"duplicate_key", b"value2");
batch.put(b"duplicate_key", b"value3");
batch.delete(b"duplicate_key");
batch.commit().unwrap();
assert_eq!(db.get(b"duplicate_key").unwrap(), None);
let mut batch2 = db.batch();
batch2.put(b"duplicate_key2", b"value1");
batch2.delete(b"duplicate_key2");
batch2.put(b"duplicate_key2", b"final_value");
batch2.commit().unwrap();
assert_eq!(
db.get(b"duplicate_key2").unwrap().as_deref(),
Some(&b"final_value"[..])
);
}
#[test]
fn test_batch_size_limits() {
let temp_dir = TempDir::new().unwrap();
let db = DB::open(temp_dir.path()).unwrap();
let mut batch = db.batch();
for i in 0..10000 {
batch.put(format!("key_{:05}", i).as_bytes(), b"value");
}
batch.commit().unwrap();
assert!(db.get(b"key_00000").unwrap().is_some());
assert!(db.get(b"key_05000").unwrap().is_some());
assert!(db.get(b"key_09999").unwrap().is_some());
}
#[test]
fn test_batch_memory_accounting() {
let temp_dir = TempDir::new().unwrap();
let db = DBOptions::default()
.max_memory_bytes(Some(50 * 1024 * 1024))
.background_flush(false)
.open(temp_dir.path())
.unwrap();
let mem_before = db.estimate_memory_usage();
let mut batch = db.batch();
for i in 0..1000 {
let key = format!("key_{:04}", i);
let value = vec![b'x'; 1000];
batch.put(key.as_bytes(), &value);
}
batch.commit().unwrap();
let mem_after = db.estimate_memory_usage();
let mem_delta = mem_after - mem_before;
assert!(
mem_delta > 500_000 && mem_delta < 2_000_000,
"Memory delta should be ~1MB, got {}",
mem_delta
);
}
#[test]
fn test_batch_with_zero_length_keys() {
let temp_dir = TempDir::new().unwrap();
let db = DB::open(temp_dir.path()).unwrap();
let mut batch = db.batch();
batch.put(b"", b"empty_key_value"); batch.put(b"normal_key", b""); batch.put(b"", b"");
batch.commit().unwrap();
assert!(db.get(b"").unwrap().is_some());
assert!(db.get(b"normal_key").unwrap().is_some());
}