use batpak::prelude::*;
use batpak::store::{Store, StoreConfig, SyncMode};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use std::sync::Arc;
use tempfile::TempDir;
fn setup_store() -> (Store, TempDir) {
let dir = TempDir::new().expect("create temp dir");
let config = StoreConfig {
data_dir: dir.path().to_path_buf(),
..StoreConfig::new("")
};
let store = Store::open(config).expect("open store");
(store, dir)
}
fn bench_write_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("write_throughput");
for count in [1_000u64, 10_000, 100_000] {
if count >= 100_000 {
group.sample_size(10);
}
group.bench_with_input(BenchmarkId::from_parameter(count), &count, |b, &count| {
b.iter_with_setup(setup_store, |(store, _dir)| {
let coord = Coordinate::new("bench:entity", "bench:scope").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
let payload = serde_json::json!({"x": 1, "y": 2});
for _ in 0..count {
store.append(&coord, kind, &payload).expect("append");
}
});
});
}
group.finish();
}
fn bench_durable_write_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("durable_write_throughput");
for count in [1_000u64, 10_000] {
group.bench_with_input(BenchmarkId::from_parameter(count), &count, |b, &count| {
b.iter_with_setup(setup_store, |(store, _dir)| {
let coord = Coordinate::new("bench:entity", "bench:scope").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
let payload = serde_json::json!({"x": 1, "y": 2});
for _ in 0..count {
store.append(&coord, kind, &payload).expect("append");
}
store.sync().expect("sync");
store.close().expect("close");
});
});
}
group.finish();
}
fn bench_concurrent_write_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("concurrent_write_throughput");
let thread_count = 4usize;
let events_per_thread = 250u64;
group.bench_function("4_threads_x_250", |b| {
b.iter_with_setup(
|| {
let (store, dir) = setup_store();
(Arc::new(store), dir)
},
|(store, _dir)| {
let mut handles = Vec::with_capacity(thread_count);
for t in 0..thread_count {
let store = Arc::clone(&store);
handles.push(
std::thread::Builder::new()
.name(format!("bench-writer-{t}"))
.spawn(move || {
let entity = format!("bench:thread{t}");
let coord =
Coordinate::new(&entity, "bench:scope").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
let payload = serde_json::json!({"t": t});
for _ in 0..events_per_thread {
store.append(&coord, kind, &payload).expect("append");
}
})
.expect("spawn thread"),
);
}
for h in handles {
h.join().expect("thread join");
}
drop(store);
},
);
});
group.finish();
}
fn bench_sync_mode_comparison(c: &mut Criterion) {
let mut group = c.benchmark_group("sync_mode_comparison");
group.sample_size(20);
let count = 1_000u64;
for (label, mode) in [
("sync_all", SyncMode::SyncAll),
("sync_data", SyncMode::SyncData),
] {
group.bench_function(label, |b| {
b.iter_with_setup(
|| {
let dir = TempDir::new().expect("create temp dir");
let config = StoreConfig {
data_dir: dir.path().to_path_buf(),
sync_every_n_events: 100,
sync_mode: mode.clone(),
..StoreConfig::new("")
};
let store = Store::open(config).expect("open store");
(store, dir)
},
|(store, _dir)| {
let coord = Coordinate::new("bench:sync", "bench:scope").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
let payload = serde_json::json!({"x": 1});
for _ in 0..count {
store.append(&coord, kind, &payload).expect("append");
}
store.close().expect("close");
},
);
});
}
group.finish();
}
criterion_group!(
benches,
bench_write_throughput,
bench_durable_write_throughput,
bench_concurrent_write_throughput,
bench_sync_mode_comparison
);
criterion_main!(benches);