use batpak::prelude::*;
use batpak::store::{BatchAppendItem, CausationRef, Store, StoreConfig, SyncConfig, SyncMode};
use batpak_bench_support::{apply_profile, throughput_elements, BenchProfile};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use tempfile::TempDir;
fn open_bench_store(
sync_mode: SyncMode,
every_n_events: u32,
) -> (Store, TempDir, Coordinate, EventKind) {
let dir = TempDir::new().expect("create temp dir");
let config = StoreConfig {
data_dir: dir.path().to_path_buf(),
sync: SyncConfig {
every_n_events,
mode: sync_mode,
},
..StoreConfig::new("")
};
let store = Store::open(config).expect("open store");
let coord = Coordinate::new("bench:entity", "bench:scope").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
(store, dir, coord, kind)
}
fn event_payload(i: usize) -> serde_json::Value {
serde_json::json!({"i": i, "payload": "x".repeat(100)})
}
fn batch_plan(total_events: usize, batch_size: usize) -> Vec<usize> {
let full_batches = total_events / batch_size;
let remainder = total_events % batch_size;
let mut plan = vec![batch_size; full_batches];
if remainder > 0 {
plan.push(remainder);
}
plan
}
fn make_batch_items(coord: &Coordinate, kind: EventKind, count: usize) -> Vec<BatchAppendItem> {
(0..count)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
kind,
&event_payload(i),
AppendOptions::default(),
CausationRef::None,
)
.expect("valid batch item")
})
.collect()
}
fn bench_batch_vs_single(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_vs_single_append");
apply_profile(&mut group, BenchProfile::Heavy);
for batch_size in [10usize, 50, 100, 256] {
let total_events = 1_000u64;
throughput_elements(&mut group, total_events);
group.bench_with_input(
BenchmarkId::new("batch", batch_size),
&batch_size,
|b, &batch_size| {
let total_events = usize::try_from(total_events)
.expect("total_events fits in usize for benchmark");
let plan = batch_plan(total_events, batch_size);
b.iter_batched(
|| open_bench_store(SyncMode::SyncData, 1),
|(store, _dir, coord, kind)| {
for item_count in &plan {
let items = make_batch_items(&coord, kind, *item_count);
store.append_batch(items).expect("batch append");
}
store.close().expect("close batch benchmark store");
},
BatchSize::SmallInput,
);
},
);
group.bench_with_input(
BenchmarkId::new("single", batch_size),
&batch_size,
|b, &_batch_size| {
b.iter_batched(
|| open_bench_store(SyncMode::SyncData, 1),
|(store, _dir, coord, kind)| {
for i in 0..total_events {
store
.append(
&coord,
kind,
&event_payload(
usize::try_from(i).expect("single append index fits"),
),
)
.expect("single append");
}
store.close().expect("close single benchmark store");
},
BatchSize::SmallInput,
);
},
);
}
group.finish();
}
fn bench_batch_durability(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_durability_overhead");
apply_profile(&mut group, BenchProfile::Heavy);
throughput_elements(&mut group, 1_000);
for sync_mode in [SyncMode::SyncData, SyncMode::SyncAll] {
let mode_name = if matches!(sync_mode, SyncMode::SyncData) {
"sync_data"
} else {
"sync_all"
};
group.bench_with_input(
BenchmarkId::new(mode_name, 100),
&sync_mode,
|b, sync_mode| {
b.iter_batched(
|| open_bench_store(sync_mode.clone(), 1),
|(store, _dir, coord, kind)| {
for _ in 0..10 {
let items = make_batch_items(&coord, kind, 100);
store.append_batch(items).expect("batch append");
}
store.close().expect("close durability benchmark store");
},
BatchSize::SmallInput,
);
},
);
}
group.finish();
}
fn bench_batch_size_scaling(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_size_scaling");
apply_profile(&mut group, BenchProfile::Quick);
let total_events = 1_000u64;
for batch_size in [1usize, 10, 50, 100, 256] {
throughput_elements(&mut group, total_events);
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
|b, &batch_size| {
let total_events = usize::try_from(total_events)
.expect("total_events fits in usize for benchmark");
let plan = batch_plan(total_events, batch_size);
b.iter_batched(
|| open_bench_store(SyncMode::SyncData, 1),
|(store, _dir, coord, kind)| {
for item_count in &plan {
let items = make_batch_items(&coord, kind, *item_count);
store.append_batch(items).expect("batch append");
}
store.close().expect("close batch scaling benchmark store");
},
BatchSize::SmallInput,
);
},
);
}
group.finish();
}
fn bench_batch_cadence_interaction(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_cadence_interaction");
apply_profile(&mut group, BenchProfile::Heavy);
throughput_elements(&mut group, 1_000);
for every_n_events in [1u32, 1_000, 10_000] {
group.bench_with_input(
BenchmarkId::new("append_batch_100", every_n_events),
&every_n_events,
|b, &every_n_events| {
b.iter_batched(
|| open_bench_store(SyncMode::SyncData, every_n_events),
|(store, _dir, coord, kind)| {
for _ in 0..10 {
let items = make_batch_items(&coord, kind, 100);
store.append_batch(items).expect("batch append");
}
store.close().expect("close batch cadence benchmark store");
},
BatchSize::SmallInput,
);
},
);
}
group.finish();
}
fn bench_batch_causation(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_with_causation");
apply_profile(&mut group, BenchProfile::Heavy);
throughput_elements(&mut group, 500);
group.bench_function("causation_chain", |b| {
b.iter_batched(
|| open_bench_store(SyncMode::SyncData, 1),
|(store, _dir, coord, kind)| {
let items: Vec<_> = (0..50)
.map(|i| {
let causation = if i == 0 {
CausationRef::None
} else {
CausationRef::PriorItem(i - 1)
};
BatchAppendItem::new(
coord.clone(),
kind,
&serde_json::json!({"seq": i}),
AppendOptions::default(),
causation,
)
.expect("valid item")
})
.collect();
store.append_batch(items).expect("batch with causation");
store.close().expect("close causation benchmark store");
},
BatchSize::SmallInput,
);
});
group.finish();
}
#[cfg(feature = "dangerous-test-hooks")]
fn bench_batch_recovery(c: &mut Criterion) {
use batpak::store::CountdownInjector;
let mut group = c.benchmark_group("batch_recovery");
apply_profile(&mut group, BenchProfile::Heavy);
group.bench_function("reopen_after_incomplete_batch", |b| {
b.iter_batched(
|| {
let dir = TempDir::new().expect("temp dir");
{
let config = StoreConfig::new(dir.path());
let store =
Store::open(config).expect("open store for recovery benchmark baseline");
store
.append(
&Coordinate::new("test", "test")
.expect("valid recovery benchmark coordinate"),
EventKind::DATA,
&serde_json::json!({"committed": true}),
)
.expect("append committed baseline event for recovery benchmark");
store
.close()
.expect("close baseline store for recovery benchmark");
}
{
let mut config = StoreConfig::new(dir.path());
config.fault_injector =
Some(std::sync::Arc::new(CountdownInjector::after_batch_items(2)));
let store =
Store::open(config).expect("open fault-injected recovery benchmark store");
let items = make_batch_items(
&Coordinate::new("test", "test").expect("valid crash benchmark coordinate"),
EventKind::DATA,
5,
);
let result = store.append_batch(items);
assert!(
result.is_err(),
"fault-injected recovery benchmark batch should fail"
);
}
dir
},
|dir| {
let config = StoreConfig::new(dir.path());
let store = Store::open(config).expect("recover from incomplete batch");
store
.close()
.expect("close recovered store in recovery benchmark");
},
BatchSize::SmallInput,
);
});
group.finish();
}
#[cfg(not(feature = "dangerous-test-hooks"))]
fn bench_batch_recovery(_c: &mut Criterion) {
}
criterion_group!(
benches,
bench_batch_vs_single,
bench_batch_durability,
bench_batch_size_scaling,
bench_batch_cadence_interaction,
bench_batch_causation,
bench_batch_recovery
);
criterion_main!(benches);