batpak 0.8.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
//! Benchmark: batch append throughput vs single append.
//!
//! Measures the overhead reduction from batching multiple events
//! into a single fsync operation.

use batpak::prelude::*;
use batpak::store::{BatchAppendItem, CausationRef, Store, StoreConfig, SyncMode};
use batpak_bench_support::{apply_profile, throughput_elements, BenchProfile};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use tempfile::TempDir;

fn open_bench_store(
    sync_mode: SyncMode,
    every_n_events: u32,
) -> (Store, TempDir, Coordinate, EventKind) {
    let dir = TempDir::new().expect("create temp dir");
    let config = StoreConfig::new(dir.path())
        .with_sync_every_n_events(every_n_events)
        .with_sync_mode(sync_mode);
    let store = Store::open(config).expect("open store");
    let coord = Coordinate::new("bench:entity", "bench:scope").expect("valid coord");
    let kind = EventKind::custom(0xF, 1);
    (store, dir, coord, kind)
}

fn event_payload(i: usize) -> serde_json::Value {
    serde_json::json!({"i": i, "payload": "x".repeat(100)})
}

fn batch_plan(total_events: usize, batch_size: usize) -> Vec<usize> {
    let full_batches = total_events / batch_size;
    let remainder = total_events % batch_size;
    let mut plan = vec![batch_size; full_batches];
    if remainder > 0 {
        plan.push(remainder);
    }
    plan
}

fn make_batch_items(coord: &Coordinate, kind: EventKind, count: usize) -> Vec<BatchAppendItem> {
    (0..count)
        .map(|i| {
            BatchAppendItem::new(
                coord.clone(),
                kind,
                &event_payload(i),
                AppendOptions::default(),
                CausationRef::None,
            )
            .expect("valid batch item")
        })
        .collect()
}

/// Benchmark: batch append vs single append throughput
fn bench_batch_vs_single(c: &mut Criterion) {
    let mut group = c.benchmark_group("batch_vs_single_append");
    apply_profile(&mut group, BenchProfile::Heavy);

    for batch_size in [10usize, 50, 100, 256] {
        let total_events = 1_000u64;
        throughput_elements(&mut group, total_events);

        // Batch append benchmark
        group.bench_with_input(
            BenchmarkId::new("batch", batch_size),
            &batch_size,
            |b, &batch_size| {
                let total_events = usize::try_from(total_events)
                    .expect("total_events fits in usize for benchmark");
                let plan = batch_plan(total_events, batch_size);
                b.iter_batched(
                    || open_bench_store(SyncMode::SyncData, 1),
                    |(store, _dir, coord, kind)| {
                        for item_count in &plan {
                            let items = make_batch_items(&coord, kind, *item_count);
                            store.append_batch(items).expect("batch append");
                        }
                        store.close().expect("close batch benchmark store");
                    },
                    BatchSize::SmallInput,
                );
            },
        );

        // Single append benchmark (equivalent work)
        group.bench_with_input(
            BenchmarkId::new("single", batch_size),
            &batch_size,
            |b, &_batch_size| {
                b.iter_batched(
                    || open_bench_store(SyncMode::SyncData, 1),
                    |(store, _dir, coord, kind)| {
                        for i in 0..total_events {
                            store
                                .append(
                                    &coord,
                                    kind,
                                    &event_payload(
                                        usize::try_from(i).expect("single append index fits"),
                                    ),
                                )
                                .expect("single append");
                        }
                        store.close().expect("close single benchmark store");
                    },
                    BatchSize::SmallInput,
                );
            },
        );
    }

    group.finish();
}

/// Benchmark: batch durability overhead (SyncAll vs SyncData)
fn bench_batch_durability(c: &mut Criterion) {
    let mut group = c.benchmark_group("batch_durability_overhead");
    apply_profile(&mut group, BenchProfile::Heavy);
    throughput_elements(&mut group, 1_000);

    for sync_mode in [SyncMode::SyncData, SyncMode::SyncAll] {
        let mode_name = if matches!(sync_mode, SyncMode::SyncData) {
            "sync_data"
        } else {
            "sync_all"
        };

        group.bench_with_input(
            BenchmarkId::new(mode_name, 100),
            &sync_mode,
            |b, sync_mode| {
                b.iter_batched(
                    || open_bench_store(sync_mode.clone(), 1),
                    |(store, _dir, coord, kind)| {
                        // 10 batches of 100 = 1000 events
                        for _ in 0..10 {
                            let items = make_batch_items(&coord, kind, 100);
                            store.append_batch(items).expect("batch append");
                        }
                        store.close().expect("close durability benchmark store");
                    },
                    BatchSize::SmallInput,
                );
            },
        );
    }

    group.finish();
}

/// Benchmark: batch size scaling (latency vs throughput tradeoff)
fn bench_batch_size_scaling(c: &mut Criterion) {
    let mut group = c.benchmark_group("batch_size_scaling");
    apply_profile(&mut group, BenchProfile::Quick);

    // Fixed total events, varying batch size
    let total_events = 1_000u64;

    for batch_size in [1usize, 10, 50, 100, 256] {
        throughput_elements(&mut group, total_events);

        group.bench_with_input(
            BenchmarkId::from_parameter(batch_size),
            &batch_size,
            |b, &batch_size| {
                let total_events = usize::try_from(total_events)
                    .expect("total_events fits in usize for benchmark");
                let plan = batch_plan(total_events, batch_size);
                b.iter_batched(
                    || open_bench_store(SyncMode::SyncData, 1),
                    |(store, _dir, coord, kind)| {
                        for item_count in &plan {
                            let items = make_batch_items(&coord, kind, *item_count);
                            store.append_batch(items).expect("batch append");
                        }
                        store.close().expect("close batch scaling benchmark store");
                    },
                    BatchSize::SmallInput,
                );
            },
        );
    }

    group.finish();
}

/// Benchmark: explicit batch path under different periodic sync cadences.
///
/// `append_batch` already performs its own final sync-before-visible, so this
/// surface shows how much the background cadence loop contributes beyond that.
fn bench_batch_cadence_interaction(c: &mut Criterion) {
    let mut group = c.benchmark_group("batch_cadence_interaction");
    apply_profile(&mut group, BenchProfile::Heavy);
    throughput_elements(&mut group, 1_000);

    for every_n_events in [1u32, 1_000, 10_000] {
        group.bench_with_input(
            BenchmarkId::new("append_batch_100", every_n_events),
            &every_n_events,
            |b, &every_n_events| {
                b.iter_batched(
                    || open_bench_store(SyncMode::SyncData, every_n_events),
                    |(store, _dir, coord, kind)| {
                        for _ in 0..10 {
                            let items = make_batch_items(&coord, kind, 100);
                            store.append_batch(items).expect("batch append");
                        }
                        store.close().expect("close batch cadence benchmark store");
                    },
                    BatchSize::SmallInput,
                );
            },
        );
    }

    group.finish();
}

/// Benchmark: batch with cross-entity causation
fn bench_batch_causation(c: &mut Criterion) {
    let mut group = c.benchmark_group("batch_with_causation");
    apply_profile(&mut group, BenchProfile::Heavy);
    throughput_elements(&mut group, 500);

    group.bench_function("causation_chain", |b| {
        b.iter_batched(
            || open_bench_store(SyncMode::SyncData, 1),
            |(store, _dir, coord, kind)| {
                // Create batch with intra-batch causation
                let items: Vec<_> = (0..50)
                    .map(|i| {
                        let causation = if i == 0 {
                            CausationRef::None
                        } else {
                            CausationRef::PriorItem(i - 1)
                        };
                        BatchAppendItem::new(
                            coord.clone(),
                            kind,
                            &serde_json::json!({"seq": i}),
                            AppendOptions::default(),
                            causation,
                        )
                        .expect("valid item")
                    })
                    .collect();
                store.append_batch(items).expect("batch with causation");
                store.close().expect("close causation benchmark store");
            },
            BatchSize::SmallInput,
        );
    });

    group.finish();
}

/// Benchmark: batch recovery after simulated crash
#[cfg(feature = "dangerous-test-hooks")]
fn bench_batch_recovery(c: &mut Criterion) {
    use batpak::store::CountdownInjector;

    let mut group = c.benchmark_group("batch_recovery");
    apply_profile(&mut group, BenchProfile::Heavy);

    group.bench_function("reopen_after_incomplete_batch", |b| {
        b.iter_batched(
            || {
                // Setup: create store with incomplete batch
                let dir = TempDir::new().expect("temp dir");

                // Write some committed events
                {
                    let config = StoreConfig::new(dir.path());
                    let store =
                        Store::open(config).expect("open store for recovery benchmark baseline");
                    store
                        .append(
                            &Coordinate::new("test", "test")
                                .expect("valid recovery benchmark coordinate"),
                            EventKind::DATA,
                            &serde_json::json!({"committed": true}),
                        )
                        .expect("append committed baseline event for recovery benchmark");
                    store
                        .close()
                        .expect("close baseline store for recovery benchmark");
                }

                // Inject crash during batch
                {
                    let config = StoreConfig::new(dir.path()).with_fault_injector(Some(
                        std::sync::Arc::new(CountdownInjector::after_batch_items(2)),
                    ));
                    let store =
                        Store::open(config).expect("open fault-injected recovery benchmark store");
                    let items = make_batch_items(
                        &Coordinate::new("test", "test").expect("valid crash benchmark coordinate"),
                        EventKind::DATA,
                        5,
                    );
                    let result = store.append_batch(items);
                    assert!(
                        result.is_err(),
                        "fault-injected recovery benchmark batch should fail"
                    );
                }

                dir
            },
            |dir| {
                // Benchmark: recovery time
                let config = StoreConfig::new(dir.path());
                let store = Store::open(config).expect("recover from incomplete batch");
                store
                    .close()
                    .expect("close recovered store in recovery benchmark");
            },
            BatchSize::SmallInput,
        );
    });

    group.finish();
}

#[cfg(not(feature = "dangerous-test-hooks"))]
fn bench_batch_recovery(_c: &mut Criterion) {
    // Recovery benchmark requires dangerous-test-hooks for fault injection
}

criterion_group!(
    benches,
    bench_batch_vs_single,
    bench_batch_durability,
    bench_batch_size_scaling,
    bench_batch_cadence_interaction,
    bench_batch_causation,
    bench_batch_recovery
);
criterion_main!(benches);