use criterion::{
black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
};
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use tempfile::tempdir;
extern crate kitedb;
use kitedb::api::kite::{Kite, KiteOptions, NodeDef, PropDef};
use kitedb::core::single_file::SyncMode;
use kitedb::error::KiteError;
use kitedb::types::PropValue;
fn temp_db_path(temp_dir: &tempfile::TempDir) -> std::path::PathBuf {
temp_dir.path().join("bench")
}
#[derive(Clone, Copy, Debug)]
enum LockMode {
PerOp,
Batched,
}
fn create_write_schema(
group_commit: bool,
wal_mb: Option<usize>,
checkpoint_threshold: Option<f64>,
) -> KiteOptions {
let user = NodeDef::new("User", "user:").prop(PropDef::string("name"));
let mut options = KiteOptions::new()
.node(user)
.sync_mode(SyncMode::Normal)
.group_commit_enabled(group_commit)
.group_commit_window_ms(2);
let wal_mb = wal_mb.or_else(|| {
env::var("KITE_BENCH_WAL_MB")
.ok()
.and_then(|value| value.parse::<usize>().ok())
});
let checkpoint_threshold = checkpoint_threshold.or_else(|| {
env::var("KITE_BENCH_CHECKPOINT_THRESHOLD")
.ok()
.and_then(|value| value.parse::<f64>().ok())
});
if let Some(mb) = wal_mb {
options = options.wal_size_mb(mb);
}
if let Some(value) = checkpoint_threshold {
options = options.checkpoint_threshold(value);
}
options
}
#[cfg(feature = "bench-profile")]
fn bench_profile_enabled() -> bool {
std::env::var("KITEDB_BENCH_PROFILE")
.map(|value| {
let value = value.to_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(false)
}
#[cfg(feature = "bench-profile")]
fn maybe_log_profile(kite: &Kite, label: &str) {
if bench_profile_enabled() {
let (commit_wait_ns, wal_flush_ns) = kite.take_profile_snapshot();
println!(
"[write_path] {label} commit_lock_wait_ns={commit_wait_ns} wal_flush_ns={wal_flush_ns}"
);
}
}
fn create_user_with_retry(kite: &mut Kite, idx: usize) -> Result<(), KiteError> {
let key = format!("user{idx}");
let mut attempts = 0;
loop {
let mut props = HashMap::with_capacity(1);
props.insert("name".to_string(), PropValue::String(key.clone()));
match kite.create_node("User", &key, props) {
Ok(_) => return Ok(()),
Err(KiteError::WalBufferFull) if attempts == 0 => {
kite.optimize()?;
attempts += 1;
}
Err(err) => return Err(err),
}
}
}
fn create_users_batched_with_retry(
kite: &mut Kite,
start: usize,
count: usize,
) -> Result<(), KiteError> {
let mut attempts = 0;
loop {
let result = kite.transaction(|ctx| {
for offset in 0..count {
let idx = start + offset;
let key = format!("user{idx}");
let mut props = HashMap::with_capacity(1);
props.insert("name".to_string(), PropValue::String(key.clone()));
ctx.create_node("User", &key, props)?;
}
Ok(())
});
match result {
Ok(_) => return Ok(()),
Err(KiteError::WalBufferFull) if attempts == 0 => {
kite.optimize()?;
attempts += 1;
}
Err(err) => return Err(err),
}
}
}
fn bench_write_concurrent_variant(
c: &mut Criterion,
group_name: &str,
lock_mode: LockMode,
group_commit: bool,
) {
let mut group = c.benchmark_group(group_name);
group.sample_size(10);
for &num_threads in [1usize, 2, 4, 8].iter() {
let ops_per_thread = 500usize;
let total_ops = num_threads * ops_per_thread;
group.throughput(Throughput::Elements(total_ops as u64));
group.bench_with_input(
BenchmarkId::new(if group_commit { "gc_on" } else { "gc_off" }, num_threads),
&num_threads,
move |bencher, &num_threads| {
bencher.iter_batched(
|| {
let temp_dir = tempdir().expect("expected value");
let kite = Kite::open(
temp_db_path(&temp_dir),
create_write_schema(group_commit, None, None),
)
.expect("expected value");
let kite = Arc::new(parking_lot::RwLock::new(kite));
(temp_dir, kite)
},
|(_temp_dir, kite)| {
let barrier = Arc::new(Barrier::new(num_threads));
let counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let kite = Arc::clone(&kite);
let barrier = Arc::clone(&barrier);
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
barrier.wait();
match lock_mode {
LockMode::PerOp => {
for _ in 0..ops_per_thread {
let idx = counter.fetch_add(1, Ordering::SeqCst) as usize;
let mut guard = kite.write();
create_user_with_retry(&mut guard, idx).expect("expected value");
}
}
LockMode::Batched => {
let mut i = 0usize;
while i < ops_per_thread {
let batch = (ops_per_thread - i).min(100);
let start = counter.fetch_add(batch as u64, Ordering::SeqCst) as usize;
let mut guard = kite.write();
create_users_batched_with_retry(&mut guard, start, batch)
.expect("expected value");
i += batch;
}
}
}
}));
}
for handle in handles {
handle.join().expect("expected value");
}
#[cfg(feature = "bench-profile")]
{
let kite_ref = kite.read();
maybe_log_profile(
&kite_ref,
&format!("concurrent threads={num_threads} mode={lock_mode:?} gc={group_commit}"),
);
}
if let Ok(lock) = Arc::try_unwrap(kite) {
let kite = lock.into_inner();
kite.close().expect("expected value");
}
},
BatchSize::SmallInput,
);
},
);
}
group.finish();
}
fn bench_write_single(c: &mut Criterion) {
let mut group = c.benchmark_group("write_single");
group.sample_size(10);
for &count in [1_000usize, 5_000usize].iter() {
for &group_commit in [false, true].iter() {
let label = if group_commit { "gc_on" } else { "gc_off" };
group.throughput(Throughput::Elements(count as u64));
group.bench_with_input(
BenchmarkId::new(label, count),
&count,
move |bencher, &count| {
bencher.iter_batched(
|| {
let temp_dir = tempdir().expect("expected value");
let kite = Kite::open(
temp_db_path(&temp_dir),
create_write_schema(group_commit, None, None),
)
.expect("expected value");
(temp_dir, kite)
},
|(_temp_dir, mut kite)| {
for i in 0..count {
create_user_with_retry(&mut kite, i).expect("expected value");
}
#[cfg(feature = "bench-profile")]
maybe_log_profile(&kite, &format!("single count={count} gc={group_commit}"));
kite.close().expect("expected value");
black_box(());
},
BatchSize::SmallInput,
);
},
);
}
}
group.finish();
}
fn bench_write_batch_tx(c: &mut Criterion) {
let mut group = c.benchmark_group("write_batch_tx");
group.sample_size(10);
for &total_ops in [1_000usize, 5_000usize].iter() {
for &batch_size in [10usize, 100usize, 500usize, 1000usize].iter() {
for &group_commit in [false, true].iter() {
let label = if group_commit { "gc_on" } else { "gc_off" };
let bench_id = format!("bs{batch_size}_{label}");
group.throughput(Throughput::Elements(total_ops as u64));
group.bench_with_input(
BenchmarkId::new(bench_id, total_ops),
&total_ops,
move |bencher, &total_ops| {
bencher.iter_batched(
|| {
let temp_dir = tempdir().expect("expected value");
let kite = Kite::open(
temp_db_path(&temp_dir),
create_write_schema(group_commit, None, None),
)
.expect("expected value");
(temp_dir, kite)
},
|(_temp_dir, mut kite)| {
let mut start = 0usize;
while start < total_ops {
let count = (total_ops - start).min(batch_size);
create_users_batched_with_retry(&mut kite, start, count).expect("expected value");
start += count;
}
#[cfg(feature = "bench-profile")]
maybe_log_profile(
&kite,
&format!("batch total={total_ops} batch_size={batch_size} gc={group_commit}"),
);
kite.close().expect("expected value");
black_box(());
},
BatchSize::SmallInput,
);
},
);
}
}
}
group.finish();
}
fn bench_write_batch_tx_wal_sweep(c: &mut Criterion) {
let mut group = c.benchmark_group("write_batch_tx_wal_sweep");
group.sample_size(10);
let total_ops = 5_000usize;
let batch_size = 100usize;
let wal_sizes = [4usize, 16, 64, 256];
let thresholds = [0.5_f64, 0.8_f64, 0.95_f64];
for &wal_mb in wal_sizes.iter() {
for &threshold in thresholds.iter() {
let bench_id = format!("wal{wal_mb}_thr{threshold:.2}");
group.throughput(Throughput::Elements(total_ops as u64));
group.bench_with_input(
BenchmarkId::new(bench_id, total_ops),
&total_ops,
move |bencher, &total_ops| {
bencher.iter_batched(
|| {
let temp_dir = tempdir().expect("expected value");
let kite = Kite::open(
temp_db_path(&temp_dir),
create_write_schema(false, Some(wal_mb), Some(threshold)),
)
.expect("expected value");
(temp_dir, kite)
},
|(_temp_dir, mut kite)| {
let mut start = 0usize;
while start < total_ops {
let count = (total_ops - start).min(batch_size);
create_users_batched_with_retry(&mut kite, start, count).expect("expected value");
start += count;
}
#[cfg(feature = "bench-profile")]
maybe_log_profile(
&kite,
&format!(
"wal_sweep total={total_ops} batch_size={batch_size} wal_mb={wal_mb} thr={threshold}"
),
);
kite.close().expect("expected value");
black_box(());
},
BatchSize::SmallInput,
);
},
);
}
}
group.finish();
}
fn bench_write_concurrent_per_op(c: &mut Criterion) {
bench_write_concurrent_variant(c, "write_concurrent_per_op", LockMode::PerOp, false);
bench_write_concurrent_variant(c, "write_concurrent_per_op_gc", LockMode::PerOp, true);
}
fn bench_write_concurrent_batched(c: &mut Criterion) {
bench_write_concurrent_variant(c, "write_concurrent_batched", LockMode::Batched, false);
bench_write_concurrent_variant(c, "write_concurrent_batched_gc", LockMode::Batched, true);
}
criterion_group!(
benches,
bench_write_single,
bench_write_batch_tx,
bench_write_batch_tx_wal_sweep,
bench_write_concurrent_per_op,
bench_write_concurrent_batched
);
criterion_main!(benches);