use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
use emdb::{Emdb, FlushPolicy};
fn read_env_usize(key: &str, default: usize) -> usize {
std::env::var(key)
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(default)
}
fn read_env_u64(key: &str, default: u64) -> u64 {
std::env::var(key)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|v| *v > 0)
.unwrap_or(default)
}
fn tmp_path(label: &str) -> PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
p.push(format!("emdb-gc-bench-{label}-{nanos}.emdb"));
p
}
fn cleanup(path: &Path) {
let _ = std::fs::remove_file(path);
let display = path.display();
let _ = std::fs::remove_file(format!("{display}.lock"));
}
fn run_workload(db: Arc<Emdb>, threads: usize, per_thread: usize) -> Duration {
let barrier = Arc::new(Barrier::new(threads + 1));
let mut handles = Vec::with_capacity(threads);
for t in 0..threads {
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
for i in 0..per_thread {
let key = format!("t{t}-i{i:06}");
let value = format!("payload-{t}-{i}");
db.insert(key.as_str(), value.as_str())
.expect("insert in workload");
db.flush().expect("flush in workload");
}
}));
}
barrier.wait();
let started = Instant::now();
for h in handles {
h.join().expect("worker thread");
}
started.elapsed()
}
fn main() {
let threads = read_env_usize("EMDB_BENCH_GC_THREADS", 8);
let per_thread = read_env_usize("EMDB_BENCH_GC_PER_THREAD", 200);
let max_wait_us = read_env_u64("EMDB_BENCH_GC_MAX_WAIT_US", 500);
let max_batch = read_env_usize("EMDB_BENCH_GC_MAX_BATCH", threads);
let total_writes = (threads * per_thread) as u64;
println!(
"emdb group-commit bench: {threads} threads × {per_thread} writes/thread = {total_writes} total"
);
println!("Group policy: max_wait = {max_wait_us}µs, max_batch = {max_batch}\n");
let _ = (max_wait_us, max_batch); let policies: &[(&str, FlushPolicy)] = &[
("OnEachFlush", FlushPolicy::OnEachFlush),
("Group", FlushPolicy::Group),
];
println!(
"| {:<14} | {:>14} | {:>16} | {:>10} |",
"policy", "wall time (ms)", "writes/sec", "speedup"
);
println!("|{:-<16}|{:->16}|{:->18}|{:->12}|", "", "", "", "");
let mut baseline_secs: Option<f64> = None;
for (label, policy) in policies {
let path = tmp_path(label);
cleanup(&path);
let db = Arc::new(
Emdb::builder()
.path(path.clone())
.flush_policy(*policy)
.build()
.expect("emdb open"),
);
let elapsed = run_workload(Arc::clone(&db), threads, per_thread);
let secs = elapsed.as_secs_f64();
let writes_per_sec = total_writes as f64 / secs;
let speedup = match baseline_secs {
Some(base) => format!("{:.2}×", base / secs),
None => {
baseline_secs = Some(secs);
"1.00×".to_string()
}
};
println!(
"| {:<14} | {:>14} | {:>16} | {:>10} |",
label,
format!("{}", elapsed.as_millis()),
format!("{writes_per_sec:.0}"),
speedup
);
let db = Arc::into_inner(db).expect("db arc unique");
drop(db);
cleanup(&path);
}
}