#[cfg(feature = "var-collections")]
use armdb::{CacheConfig, VarTree};
use armdb::{Config, ConstTree, DbError};
use rand::{Rng, RngExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Instant;
use tempfile::TempDir;
struct Zipfian {
cdf: Vec<f64>,
}
impl Zipfian {
fn new(n: u64, skew: f64) -> Self {
let mut cdf = Vec::with_capacity(n as usize);
let mut sum = 0.0;
for i in 1..=n {
sum += 1.0 / (i as f64).powf(skew);
cdf.push(sum);
}
let total = sum;
for v in &mut cdf {
*v /= total;
}
Self { cdf }
}
fn sample(&self, rng: &mut impl Rng) -> u64 {
let u: f64 = rng.random();
self.cdf.partition_point(|&c| c < u) as u64
}
}
fn make_key(id: u64) -> [u8; 8] {
id.to_be_bytes()
}
fn make_value(key: u64, counter: u64) -> [u8; 16] {
let mut v = [0u8; 16];
v[..8].copy_from_slice(&key.to_be_bytes());
v[8..].copy_from_slice(&counter.to_be_bytes());
v
}
fn verify_value(key: u64, value: &[u8; 16]) {
let stored_key = u64::from_be_bytes(value[..8].try_into().unwrap());
assert_eq!(
stored_key, key,
"value corruption: expected key {key}, got {stored_key}"
);
}
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_concurrent() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 8;
config.max_file_size = 64 * 1024;
config.compaction_threshold = 0.2;
let tree = Arc::new(ConstTree::<[u8; 8], 16>::open(dir.path(), config).unwrap());
let stop = Arc::new(AtomicBool::new(false));
let ops_total = Arc::new(AtomicU64::new(0));
const WORKER_THREADS: usize = 6;
const OPS_PER_WORKER: u64 = 100_000;
const KEYSPACE: u64 = 10_000;
let start = Instant::now();
let compaction_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.compact();
std::thread::sleep(std::time::Duration::from_millis(10));
}
})
};
let flush_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.flush_buffers();
std::thread::sleep(std::time::Duration::from_millis(50));
}
})
};
let worker_handles: Vec<_> = (0..WORKER_THREADS)
.map(|_| {
let tree = Arc::clone(&tree);
let ops_total = Arc::clone(&ops_total);
std::thread::spawn(move || {
let mut rng = rand::rng();
let zipf = Zipfian::new(KEYSPACE, 1.0);
for _ in 0..OPS_PER_WORKER {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let op: u8 = rng.random_range(0..100);
match op {
0..60 => {
let counter: u64 = rng.random();
let v = make_value(key_id, counter);
tree.put(&k, &v).unwrap();
}
60..90 => {
if let Some(v) = tree.get(&k) {
verify_value(key_id, &v);
}
}
90..95 => {
let _ = tree.delete(&k);
}
_ => {
let old_v = make_value(key_id, rng.random());
let new_v = make_value(key_id, rng.random());
match tree.cas(&k, &old_v, &new_v) {
Ok(()) | Err(DbError::CasMismatch) | Err(DbError::KeyNotFound) => {}
Err(e) => panic!("cas error: {e}"),
}
}
}
ops_total.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for h in worker_handles {
h.join().unwrap();
}
stop.store(true, Ordering::Relaxed);
compaction_handle.join().unwrap();
flush_handle.join().unwrap();
let elapsed = start.elapsed();
let total_ops = ops_total.load(Ordering::Relaxed);
eprintln!(
"stress_concurrent: {} ops in {:.2}s ({:.0} ops/sec), tree.len()={}",
total_ops,
elapsed.as_secs_f64(),
total_ops as f64 / elapsed.as_secs_f64(),
tree.len()
);
for (k, v) in tree.iter() {
let key_id = u64::from_be_bytes(k);
verify_value(key_id, &v);
}
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_concurrent_var() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 8;
config.max_file_size = 64 * 1024;
config.compaction_threshold = 0.2;
config.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = Arc::new(VarTree::<[u8; 8]>::open(dir.path(), config).unwrap());
let ops_total = Arc::new(AtomicU64::new(0));
const WORKER_THREADS: usize = 6;
const OPS_PER_WORKER: u64 = 100_000;
const KEYSPACE: u64 = 10_000;
let start = Instant::now();
let stop = Arc::new(AtomicBool::new(false));
let compaction_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.compact();
std::thread::sleep(std::time::Duration::from_millis(10));
}
})
};
let flush_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.flush_buffers();
std::thread::sleep(std::time::Duration::from_millis(50));
}
})
};
let worker_handles: Vec<_> = (0..WORKER_THREADS)
.map(|_| {
let tree = Arc::clone(&tree);
let ops_total = Arc::clone(&ops_total);
std::thread::spawn(move || {
let mut rng = rand::rng();
let zipf = Zipfian::new(KEYSPACE, 1.0);
for _ in 0..OPS_PER_WORKER {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let op: u8 = rng.random_range(0..100);
match op {
0..60 => {
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
60..90 => {
if let Some(bv) = tree.get(&k) {
let bytes = bv.as_ref();
if bytes.len() >= 8 {
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
assert_eq!(
stored, key_id,
"var value corruption: expected {key_id}, got {stored}"
);
}
}
}
90..95 => {
let _ = tree.delete(&k);
}
_ => {
let mut v = vec![0u8; 32];
v[..8].copy_from_slice(&key_id.to_be_bytes());
match tree.insert(&k, &v) {
Ok(()) | Err(DbError::KeyExists) => {}
Err(e) => panic!("insert error: {e}"),
}
}
}
ops_total.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for h in worker_handles {
h.join().unwrap();
}
stop.store(true, Ordering::Relaxed);
compaction_handle.join().unwrap();
flush_handle.join().unwrap();
let elapsed = start.elapsed();
let total_ops = ops_total.load(Ordering::Relaxed);
eprintln!(
"stress_concurrent_var: {} ops in {:.2}s ({:.0} ops/sec), tree.len()={}",
total_ops,
elapsed.as_secs_f64(),
total_ops as f64 / elapsed.as_secs_f64(),
tree.len()
);
drop(tree);
let mut config2 = Config::test();
config2.shard_count = 8;
config2.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config2).unwrap();
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let bytes = bv.as_ref();
if bytes.len() >= 8 {
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
assert_eq!(stored, key_id, "final verification: corruption");
}
}
}
#[cfg(unix)]
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_crash_recovery() {
use std::fs;
const ITERATIONS: usize = 50;
const ENTRIES_PER_ITER: u64 = 500;
let dir = TempDir::new().unwrap();
for iter in 0..ITERATIONS {
let pid = unsafe { libc::fork() };
if pid == 0 {
let mut config = Config::test();
config.shard_count = 2;
let tree = ConstTree::<[u8; 8], 16>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
for _ in 0..ENTRIES_PER_ITER {
let key_id: u64 = rng.random_range(0..1000);
let k = make_key(key_id);
let v = make_value(key_id, iter as u64);
let _ = tree.put(&k, &v);
}
let delay_us: u64 = rng.random_range(0..5000);
std::thread::sleep(std::time::Duration::from_micros(delay_us));
unsafe { libc::_exit(0) };
}
assert!(pid > 0, "fork() failed");
std::thread::sleep(std::time::Duration::from_millis(10));
unsafe {
libc::kill(pid, libc::SIGKILL);
}
let mut status: i32 = 0;
unsafe {
libc::waitpid(pid, &mut status, 0);
}
let mut config = Config::test();
config.shard_count = 2;
let result = ConstTree::<[u8; 8], 16>::open(dir.path(), config);
match result {
Ok(tree) => {
for (k, v) in tree.iter() {
let key_id = u64::from_be_bytes(k);
verify_value(key_id, &v);
}
tree.close().unwrap();
}
Err(e) => {
eprintln!(" iter {iter}: recovery error (acceptable): {e}");
let shard_dirs: Vec<_> = fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().starts_with("shard_"))
.collect();
for sd in shard_dirs {
for f in fs::read_dir(sd.path()).unwrap().filter_map(|e| e.ok()) {
if f.path().extension().and_then(|e| e.to_str()) == Some("hint") {
let _ = fs::remove_file(f.path());
}
}
}
}
}
}
eprintln!("stress_crash_recovery: {ITERATIONS} iterations completed");
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: large dataset, run on Linux with constrained RAM"]
fn stress_large_dataset() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 4;
config.cache = CacheConfig {
max_size: 4 * 1024 * 1024,
estimated_items: 100_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
const TOTAL_ENTRIES: u64 = 1_000_000;
const VALUE_SIZE: usize = 512;
const READ_OPS: u64 = 100_000;
let start = Instant::now();
for i in 0..TOTAL_ENTRIES {
let k = make_key(i);
let mut v = vec![0u8; VALUE_SIZE];
v[..8].copy_from_slice(&i.to_be_bytes());
tree.put(&k, &v).unwrap();
if i > 0 && i % 100_000 == 0 {
let elapsed = start.elapsed().as_secs_f64();
eprintln!(
" write: {i}/{TOTAL_ENTRIES} ({:.0} ops/sec)",
i as f64 / elapsed
);
}
}
let write_elapsed = start.elapsed();
eprintln!(
"write phase: {} entries in {:.2}s ({:.0} ops/sec)",
TOTAL_ENTRIES,
write_elapsed.as_secs_f64(),
TOTAL_ENTRIES as f64 / write_elapsed.as_secs_f64()
);
assert_eq!(tree.len(), TOTAL_ENTRIES as usize);
tree.close().unwrap();
let mut config2 = Config::test();
config2.shard_count = 4;
config2.cache = CacheConfig {
max_size: 4 * 1024 * 1024,
estimated_items: 100_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config2).unwrap();
let zipf = Zipfian::new(TOTAL_ENTRIES, 1.0);
let start = Instant::now();
let mut hits = 0u64;
for _ in 0..READ_OPS {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
if let Some(bv) = tree.get(&k) {
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
assert_eq!(stored, key_id);
hits += 1;
}
}
let hot_elapsed = start.elapsed();
eprintln!(
"read (zipfian): {} ops in {:.2}s ({:.0} ops/sec, {:.1}% hit)",
READ_OPS,
hot_elapsed.as_secs_f64(),
READ_OPS as f64 / hot_elapsed.as_secs_f64(),
hits as f64 / READ_OPS as f64 * 100.0
);
let start = Instant::now();
hits = 0;
for _ in 0..READ_OPS {
let key_id: u64 = rng.random_range(0..TOTAL_ENTRIES);
let k = make_key(key_id);
if let Some(bv) = tree.get(&k) {
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
assert_eq!(stored, key_id);
hits += 1;
}
}
let cold_elapsed = start.elapsed();
eprintln!(
"read (uniform): {} ops in {:.2}s ({:.0} ops/sec, {:.1}% hit)",
READ_OPS,
cold_elapsed.as_secs_f64(),
READ_OPS as f64 / cold_elapsed.as_secs_f64(),
hits as f64 / READ_OPS as f64 * 100.0
);
let start = Instant::now();
for _ in 0..READ_OPS {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
if rng.random_range(0..100u8) < 80 {
if let Some(bv) = tree.get(&k) {
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
assert_eq!(stored, key_id);
}
} else {
let mut v = vec![0u8; VALUE_SIZE];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
}
let mixed_elapsed = start.elapsed();
eprintln!(
"mixed (80/20): {} ops in {:.2}s ({:.0} ops/sec)",
READ_OPS,
mixed_elapsed.as_secs_f64(),
READ_OPS as f64 / mixed_elapsed.as_secs_f64()
);
}
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_compaction_pressure() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 4;
config.max_file_size = 4096;
config.compaction_threshold = 0.05;
let tree = ConstTree::<[u8; 8], 16>::open(dir.path(), config).unwrap();
const ENTRIES: u64 = 100_000;
const OVERWRITES: u64 = 50_000;
let start = Instant::now();
for i in 0..ENTRIES {
let k = make_key(i);
let v = make_value(i, 1);
tree.put(&k, &v).unwrap();
}
eprintln!(
"phase 1 (insert {}): {:.2}s",
ENTRIES,
start.elapsed().as_secs_f64()
);
let start = Instant::now();
for i in 0..OVERWRITES {
let k = make_key(i);
let v = make_value(i, 2);
tree.put(&k, &v).unwrap();
}
eprintln!(
"phase 2 (overwrite {}): {:.2}s",
OVERWRITES,
start.elapsed().as_secs_f64()
);
let start = Instant::now();
let compacted = tree.compact().unwrap();
eprintln!(
"compact 1: {} entries in {:.2}s",
compacted,
start.elapsed().as_secs_f64()
);
let start = Instant::now();
for i in OVERWRITES..ENTRIES {
let k = make_key(i);
let v = make_value(i, 3);
tree.put(&k, &v).unwrap();
}
eprintln!(
"phase 3 (overwrite {}): {:.2}s",
ENTRIES - OVERWRITES,
start.elapsed().as_secs_f64()
);
let start = Instant::now();
let compacted = tree.compact().unwrap();
eprintln!(
"compact 2: {} entries in {:.2}s",
compacted,
start.elapsed().as_secs_f64()
);
assert_eq!(tree.len(), ENTRIES as usize);
for i in 0..ENTRIES {
let k = make_key(i);
let v = tree
.get(&k)
.unwrap_or_else(|| panic!("key {i} missing after compaction"));
verify_value(i, &v);
let expected_counter = if i < OVERWRITES { 2u64 } else { 3u64 };
let stored_counter = u64::from_be_bytes(v[8..16].try_into().unwrap());
assert_eq!(
stored_counter, expected_counter,
"key {i}: expected counter {expected_counter}, got {stored_counter}"
);
}
tree.close().unwrap();
let mut config = Config::test();
config.shard_count = 4;
let tree = ConstTree::<[u8; 8], 16>::open(dir.path(), config).unwrap();
assert_eq!(tree.len(), ENTRIES as usize);
for i in 0..ENTRIES {
let k = make_key(i);
let v = tree
.get(&k)
.unwrap_or_else(|| panic!("key {i} missing after recovery"));
verify_value(i, &v);
}
eprintln!(
"stress_compaction_pressure: all {} entries verified",
ENTRIES
);
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: minimal VarTree concurrent write bug reproducer"]
fn stress_var_minimal() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 1;
config.max_file_size = 4096;
let tree = Arc::new(VarTree::<[u8; 8]>::open(dir.path(), config).unwrap());
const OPS: u64 = 10_000;
const KEYSPACE: u64 = 100;
let handles: Vec<_> = (0..2)
.map(|_| {
let tree = Arc::clone(&tree);
std::thread::spawn(move || {
let mut rng = rand::rng();
for _ in 0..OPS {
let key_id: u64 = rng.random_range(0..KEYSPACE);
let k = make_key(key_id);
let mut v = vec![0u8; 32];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
for i in 0..KEYSPACE {
let k = make_key(i);
if let Some(bv) = tree.get(&k) {
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
assert_eq!(stored, i, "live read corruption at key {i}");
}
}
drop(tree);
let mut config2 = Config::test();
config2.shard_count = 1;
let tree = VarTree::<[u8; 8]>::open(dir.path(), config2).unwrap();
let mut corrupted = 0;
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
}
}
eprintln!(
"stress_var_minimal: len={}, corrupted={}",
tree.len(),
corrupted
);
assert_eq!(corrupted, 0, "{corrupted} corrupted entries after reopen");
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: VarTree concurrent bug bisection"]
fn stress_var_bisect() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 8;
config.max_file_size = 64 * 1024;
config.compaction_threshold = 0.2;
config.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = Arc::new(VarTree::<[u8; 8]>::open(dir.path(), config).unwrap());
const THREADS: usize = 6;
const OPS: u64 = 100_000;
const KEYSPACE: u64 = 10_000;
let handles: Vec<_> = (0..THREADS)
.map(|_| {
let tree = Arc::clone(&tree);
std::thread::spawn(move || {
let mut rng = rand::rng();
let zipf = Zipfian::new(KEYSPACE, 1.0);
for _ in 0..OPS {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut live_corrupted = 0;
for i in 0..KEYSPACE {
let k = make_key(i);
if let Some(bv) = tree.get(&k) {
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
if stored != i {
live_corrupted += 1;
}
}
}
eprintln!("live corrupted: {live_corrupted}");
drop(tree);
let mut config2 = Config::test();
config2.shard_count = 8;
config2.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config2).unwrap();
let mut corrupted = 0;
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
}
}
eprintln!(
"stress_var_bisect: len={}, live_corrupted={live_corrupted}, reopen_corrupted={corrupted}",
tree.len()
);
assert_eq!(corrupted, 0, "{corrupted} corrupted entries after reopen");
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: single-thread VarTree to isolate concurrency"]
fn stress_var_single_thread() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 8;
config.max_file_size = 64 * 1024;
config.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
let zipf = Zipfian::new(10_000, 1.0);
for _ in 0..100_000u64 {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
let mut corrupted = 0;
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let bytes = bv.as_ref();
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
if corrupted <= 5 {
eprintln!(
" CORRUPT key={key_id} stored={stored} vlen={} first16={:?}",
bytes.len(),
&bytes[..bytes.len().min(16)]
);
}
}
}
eprintln!("single_thread: len={}, corrupted={corrupted}", tree.len());
assert_eq!(corrupted, 0);
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: single-thread no rotation"]
fn stress_var_no_rotation() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 1;
config.max_file_size = 256 * 1024 * 1024; config.cache = CacheConfig {
max_size: 16 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
let zipf = Zipfian::new(10_000, 1.0);
for _ in 0..100_000u64 {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
let mut corrupted = 0;
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
}
}
eprintln!("no_rotation: len={}, corrupted={corrupted}", tree.len());
assert_eq!(corrupted, 0);
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: rotation without cache"]
fn stress_var_rotation_no_cache() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 1;
config.max_file_size = 4096;
config.cache = CacheConfig {
max_size: 0,
estimated_items: 0,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
let zipf = Zipfian::new(10_000, 1.0);
for _ in 0..100_000u64 {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
let mut corrupted = 0;
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let stored = u64::from_be_bytes(bv.as_ref()[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
}
}
eprintln!(
"rotation_no_cache: len={}, corrupted={corrupted}",
tree.len()
);
assert_eq!(corrupted, 0);
}
#[cfg(feature = "var-collections")]
#[cfg(unix)]
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_crash_recovery_var() {
use std::fs;
const ITERATIONS: usize = 50;
const ENTRIES_PER_ITER: u64 = 500;
let dir = TempDir::new().unwrap();
for iter in 0..ITERATIONS {
let pid = unsafe { libc::fork() };
if pid == 0 {
let mut config = Config::test();
config.shard_count = 2;
config.cache = CacheConfig {
max_size: 4 * 1024 * 1024,
estimated_items: 10_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config).unwrap();
let mut rng = rand::rng();
for _ in 0..ENTRIES_PER_ITER {
let key_id: u64 = rng.random_range(0..1000);
let k = make_key(key_id);
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
let _ = tree.put(&k, &v);
}
let delay_us: u64 = rng.random_range(0..5000);
std::thread::sleep(std::time::Duration::from_micros(delay_us));
unsafe { libc::_exit(0) };
}
assert!(pid > 0, "fork() failed");
std::thread::sleep(std::time::Duration::from_millis(10));
unsafe {
libc::kill(pid, libc::SIGKILL);
}
let mut status: i32 = 0;
unsafe {
libc::waitpid(pid, &mut status, 0);
}
let mut config = Config::test();
config.shard_count = 2;
config.cache = CacheConfig {
max_size: 4 * 1024 * 1024,
estimated_items: 10_000,
};
let result = VarTree::<[u8; 8]>::open(dir.path(), config);
match result {
Ok(tree) => {
for (k, bv) in tree.iter() {
let key_id = u64::from_be_bytes(k);
let bytes = bv.as_ref();
if bytes.len() >= 8 {
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
assert_eq!(
stored, key_id,
"crash recovery var: corruption at key {key_id}"
);
}
}
tree.close().unwrap();
}
Err(e) => {
eprintln!(" iter {iter}: recovery error (acceptable): {e}");
let shard_dirs: Vec<_> = fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().starts_with("shard_"))
.collect();
for sd in shard_dirs {
for f in fs::read_dir(sd.path()).unwrap().filter_map(|e| e.ok()) {
if f.path().extension().and_then(|e| e.to_str()) == Some("hint") {
let _ = fs::remove_file(f.path());
}
}
}
}
}
}
eprintln!("stress_crash_recovery_var: {ITERATIONS} iterations completed");
}
#[cfg(feature = "var-collections")]
#[test]
#[ignore = "stress test: run with --ignored --nocapture"]
fn stress_var_full_lifecycle() {
let dir = TempDir::new().unwrap();
let mut config = Config::test();
config.shard_count = 8;
config.max_file_size = 32 * 1024; config.compaction_threshold = 0.15;
config.cache = CacheConfig {
max_size: 8 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = Arc::new(VarTree::<[u8; 8]>::open(dir.path(), config).unwrap());
let stop = Arc::new(AtomicBool::new(false));
let ops_total = Arc::new(AtomicU64::new(0));
const WORKER_THREADS: usize = 6;
const OPS_PER_WORKER: u64 = 100_000;
const KEYSPACE: u64 = 10_000;
let start = Instant::now();
let compaction_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.compact();
std::thread::sleep(std::time::Duration::from_millis(10));
}
})
};
let flush_handle = {
let tree = Arc::clone(&tree);
let stop = Arc::clone(&stop);
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let _ = tree.flush_buffers();
std::thread::sleep(std::time::Duration::from_millis(50));
}
})
};
let worker_handles: Vec<_> = (0..WORKER_THREADS)
.map(|_| {
let tree = Arc::clone(&tree);
let ops_total = Arc::clone(&ops_total);
std::thread::spawn(move || {
let mut rng = rand::rng();
let zipf = Zipfian::new(KEYSPACE, 1.0);
for _ in 0..OPS_PER_WORKER {
let key_id = zipf.sample(&mut rng);
let k = make_key(key_id);
let op: u8 = rng.random_range(0..100);
match op {
0..50 => {
let vlen: usize = rng.random_range(16..256);
let mut v = vec![0u8; vlen];
v[..8].copy_from_slice(&key_id.to_be_bytes());
tree.put(&k, &v).unwrap();
}
50..80 => {
if let Some(bv) = tree.get(&k) {
let bytes = bv.as_ref();
if bytes.len() >= 8 {
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
assert_eq!(
stored, key_id,
"lifecycle corruption: expected {key_id}, got {stored}"
);
}
}
}
80..90 => {
let _ = tree.delete(&k);
}
90..95 => {
let mut old_v = vec![0u8; 32];
old_v[..8].copy_from_slice(&key_id.to_be_bytes());
let mut new_v = vec![0u8; 32];
new_v[..8].copy_from_slice(&key_id.to_be_bytes());
match tree.cas(&k, &old_v, &new_v) {
Ok(()) | Err(DbError::CasMismatch) | Err(DbError::KeyNotFound) => {}
Err(e) => panic!("cas error: {e}"),
}
}
_ => {
let mut v = vec![0u8; 32];
v[..8].copy_from_slice(&key_id.to_be_bytes());
match tree.insert(&k, &v) {
Ok(()) | Err(DbError::KeyExists) => {}
Err(e) => panic!("insert error: {e}"),
}
}
}
ops_total.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for h in worker_handles {
h.join().unwrap();
}
stop.store(true, Ordering::Relaxed);
compaction_handle.join().unwrap();
flush_handle.join().unwrap();
let elapsed = start.elapsed();
let total_ops = ops_total.load(Ordering::Relaxed);
eprintln!(
"phase 1 (concurrent): {} ops in {:.2}s ({:.0} ops/sec), tree.len()={}",
total_ops,
elapsed.as_secs_f64(),
total_ops as f64 / elapsed.as_secs_f64(),
tree.len()
);
drop(tree);
let mut config2 = Config::test();
config2.shard_count = 8;
config2.cache = CacheConfig {
max_size: 8 * 1024 * 1024,
estimated_items: 50_000,
};
let tree = VarTree::<[u8; 8]>::open(dir.path(), config2).unwrap();
let mut corrupted = 0;
let mut total = 0;
for (k, bv) in tree.iter() {
total += 1;
let key_id = u64::from_be_bytes(k);
let bytes = bv.as_ref();
if bytes.len() >= 8 {
let stored = u64::from_be_bytes(bytes[..8].try_into().unwrap());
if stored != key_id {
corrupted += 1;
}
}
}
eprintln!("phase 3 (verify): total={total}, corrupted={corrupted}");
assert_eq!(
corrupted, 0,
"{corrupted} corrupted entries after full lifecycle"
);
}