use std::{
sync::{
Arc,
atomic::{
AtomicBool,
AtomicU64,
Ordering,
},
},
thread,
time::{
Duration,
Instant,
},
};
use rand::{
Rng,
rngs::ThreadRng,
};
use crate::Db;
#[derive(Debug, Clone)]
pub struct BenchmarkMetrics {
pub duration_secs: f64,
pub ops_per_sec: f64,
pub bytes_written: u64,
pub bytes_read: u64,
pub write_amplification: f64,
pub degradation_ratio: f64,
pub p99_latency_us: f64,
}
pub fn run_write_benchmark(
db: Arc<Db>,
num_workers: usize,
batch_size: usize,
value_size: usize,
duration_secs: u64,
) -> BenchmarkMetrics {
let start = Instant::now();
let ops_written = Arc::new(AtomicU64::new(0));
let bytes_written = Arc::new(AtomicU64::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
let workers: Vec<_> = (0..num_workers)
.map(|id| {
let db = db.clone();
let ops = ops_written.clone();
let bytes = bytes_written.clone();
let shutdown = shutdown.clone();
thread::spawn(move || {
let rng = ThreadRng::default();
let value = vec![0u8; value_size];
let worker_key_offset = id * 1_000_000;
while !shutdown.load(Ordering::Relaxed) {
for i in 0..batch_size {
let key = format!("key_{:010}", worker_key_offset + i).into_bytes();
let _ = db.put(&key, &value);
ops.fetch_add(1, Ordering::Relaxed);
bytes.fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
}
}
})
})
.collect();
thread::sleep(Duration::from_secs(duration_secs));
shutdown.store(true, Ordering::Relaxed);
for w in workers {
let _ = w.join();
}
let elapsed = start.elapsed().as_secs_f64();
let total_ops = ops_written.load(Ordering::Relaxed);
let total_bytes = bytes_written.load(Ordering::Relaxed);
let vstats = db.version_stats();
let disk_bytes = vstats.total_size;
let write_amp = if total_bytes > 0 {
disk_bytes as f64 / total_bytes as f64
} else {
1.0
};
BenchmarkMetrics {
duration_secs: elapsed,
ops_per_sec: total_ops as f64 / elapsed,
bytes_written: total_bytes,
bytes_read: 0,
write_amplification: write_amp,
degradation_ratio: 1.0,
p99_latency_us: 0.0,
}
}
pub fn run_read_benchmark(
db: Arc<Db>,
num_workers: usize,
key_space: usize,
value_size: usize,
reads_per_write: usize,
duration_secs: u64,
) -> BenchmarkMetrics {
let start = Instant::now();
let ops = Arc::new(AtomicU64::new(0));
let bytes_read = Arc::new(AtomicU64::new(0));
let bytes_written = Arc::new(AtomicU64::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
let workers: Vec<_> = (0..num_workers)
.map(|id| {
let db = db.clone();
let ops = ops.clone();
let bytes_r = bytes_read.clone();
let bytes_w = bytes_written.clone();
let shutdown = shutdown.clone();
thread::spawn(move || {
let mut rng = ThreadRng::default();
let value = vec![0u8; value_size];
let worker_key_offset = id * 1_000_000;
while !shutdown.load(Ordering::Relaxed) {
for _ in 0..reads_per_write {
let key_idx = rng.random_range(0..key_space);
let key = format!("key_{:010}", key_idx).into_bytes();
if let Ok(Some(v)) = db.get(&key) {
bytes_r.fetch_add(v.len() as u64, Ordering::Relaxed);
}
ops.fetch_add(1, Ordering::Relaxed);
}
let key = format!("key_{:010}", worker_key_offset + rng.random_range(0..1000))
.into_bytes();
let _ = db.put(&key, &value);
ops.fetch_add(1, Ordering::Relaxed);
bytes_w.fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
}
})
})
.collect();
thread::sleep(Duration::from_secs(duration_secs));
shutdown.store(true, Ordering::Relaxed);
for w in workers {
let _ = w.join();
}
let elapsed = start.elapsed().as_secs_f64();
let total_ops = ops.load(Ordering::Relaxed);
BenchmarkMetrics {
duration_secs: elapsed,
ops_per_sec: total_ops as f64 / elapsed,
bytes_written: bytes_written.load(Ordering::Relaxed),
bytes_read: bytes_read.load(Ordering::Relaxed),
write_amplification: 1.0,
degradation_ratio: 1.0,
p99_latency_us: 0.0,
}
}
pub fn run_mixed_benchmark(
db: Arc<Db>,
num_workers: usize,
key_space: usize,
value_size: usize,
duration_secs: u64,
) -> BenchmarkMetrics {
run_read_benchmark(db, num_workers, key_space, value_size, 1, duration_secs)
}