mako-engine 0.5.0

Event-sourced process engine for German energy market communication (MaKo)
Documentation
//! Benchmark: storage hot paths — append, load, pending, and snapshot.
//!
//! Covers the critical latency budget for BDEW regulatory processes:
//!
//! - **GPKE**: APERAK must be sent within **24 wall-clock hours** — every
//!   `append` + `pending` call in the delivery path must stay well under 100 ms
//!   even at queue depths of 1 000 messages.
//! - **Snapshot**: `state_with_snapshot` must outperform full replay once the
//!   stream exceeds `snapshot_interval` events, validating that 100 is the right
//!   default.
//!
//! All benchmarks use [`InMemoryEventStore`] / [`InMemoryOutboxStore`] so the
//! numbers reflect deserialization and data-structure cost in isolation —
//! no I/O noise from SlateDB or the OS.
//!
//! # Running
//!
//! ```bash
//! cargo bench -p mako-engine --bench storage --features testing
//! ```
//!
//! Results are written to `target/criterion/storage/`.

use std::hint::black_box;

use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use mako_engine::{
    envelope::NewEvent,
    error::EngineError,
    event_store::{EventStore, ExpectedVersion, InMemoryEventStore},
    ids::{ConversationId, CorrelationId, EventId, ProcessId, StreamId, TenantId},
    outbox::{InMemoryOutboxStore, OutboxMessage, OutboxStore as _},
    snapshot::{InMemorySnapshotStore, Snapshot, SnapshotStore as _},
    version::WorkflowId,
};
use tokio::runtime::Runtime;

// ── Helpers ───────────────────────────────────────────────────────────────────

fn make_rt() -> Runtime {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
}

fn wid() -> WorkflowId {
    WorkflowId::new("gpke-lf-anmeldung", "FV2025-10-01")
}

fn make_event() -> NewEvent {
    // Payload mirrors the smallest real GPKE SupplierChangeInitiated event so
    // the serde cost is representative of production workloads.
    NewEvent::new(
        CorrelationId::new(),
        None,
        ConversationId::new(),
        ProcessId::new(),
        TenantId::new(),
        wid(),
        "SupplierChangeInitiated",
        1,
        serde_json::json!({
            "pruefidentifikator": 55001,
            "sender":             "4012345000023",
            "receiver":           "9900357000004",
            "location_id":        "51238696781",
            "document_date":      "20250115",
            "message_ref":        "REF-0000",
            "validation_passed":  true,
            "validation_errors":  []
        }),
    )
}

fn make_outbox_message() -> OutboxMessage {
    OutboxMessage::new(
        StreamId::new("bench/outbox-stream"),
        ProcessId::new(),
        TenantId::new(),
        CorrelationId::new(),
        ConversationId::new(),
        EventId::new(),
        "UTILMD",
        "9900357000004",
        serde_json::json!({
            "message_type": "UTILMD",
            "pid": 55001,
            "sender": "4012345000023",
            "receiver": "9900357000004"
        }),
    )
}

/// Seed a store with `n` events on a single stream; return `(store, stream)`.
fn seeded_event_store(rt: &Runtime, n: usize) -> (InMemoryEventStore, StreamId) {
    let store = InMemoryEventStore::new();
    let stream = StreamId::new("bench/event-stream");
    let events: Vec<NewEvent> = (0..n).map(|_| make_event()).collect();
    rt.block_on(async {
        store
            .append(&stream, ExpectedVersion::NoStream, &events)
            .await
            .expect("seeded append must succeed");
    });
    (store, stream)
}

/// Seed an outbox store with `depth` messages; return the store.
fn seeded_outbox_store(rt: &Runtime, depth: usize) -> InMemoryOutboxStore {
    let store = InMemoryOutboxStore::new();
    let messages: Vec<OutboxMessage> = (0..depth).map(|_| make_outbox_message()).collect();
    rt.block_on(async {
        store
            .enqueue(&messages)
            .await
            .expect("seeded enqueue must succeed");
    });
    store
}

// ── EventStore benchmarks ─────────────────────────────────────────────────────

/// `EventStore::append` for a single event — the dominant per-command latency.
///
/// Each iteration appends one event to a fresh empty stream, simulating the
/// cost of committing a domain event during command dispatch.
fn bench_append_single_event(c: &mut Criterion) {
    let rt = make_rt();

    c.bench_function("storage/append_single_event", |b| {
        b.to_async(&rt).iter(|| async {
            let store = InMemoryEventStore::new();
            let stream = StreamId::new("bench/append-single");
            store
                .append(
                    &stream,
                    ExpectedVersion::NoStream,
                    &[black_box(make_event())],
                )
                .await
                .expect("append must succeed");
        });
    });
}

/// `EventStore::append` for a batch of 10 events — the APERAK-window batch path.
///
/// GPKE processes can emit up to ~10 events per command dispatch (Initiated +
/// DeadlineRegistered + OutboxEnqueued etc.).  This measures the batch cost.
fn bench_append_batch_10(c: &mut Criterion) {
    let rt = make_rt();

    c.bench_function("storage/append_batch_10", |b| {
        b.to_async(&rt).iter(|| async {
            let store = InMemoryEventStore::new();
            let stream = StreamId::new("bench/append-batch");
            let events: Vec<NewEvent> = (0..10).map(|_| make_event()).collect();
            store
                .append(&stream, ExpectedVersion::NoStream, black_box(&events))
                .await
                .expect("batch append must succeed");
        });
    });
}

/// `EventStore::load` (full replay) at stream depths of 10, 100, 1 000, and 10 000.
///
/// Validates that replay cost is **linear** in stream length.  The baseline is
/// the 10-event case; the 10 000-event case sets the upper bound for a stream
/// that has never been snapshotted.
fn bench_load_n_events(c: &mut Criterion) {
    let rt = make_rt();
    let mut group = c.benchmark_group("storage/load_n_events");

    for &n in &[10usize, 100, 1_000, 10_000] {
        let (store, stream) = seeded_event_store(&rt, n);
        group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| {
            b.to_async(&rt).iter(|| async {
                let events = store
                    .load(black_box(&stream))
                    .await
                    .expect("load must succeed");
                assert_eq!(events.len(), n);
            });
        });
    }

    group.finish();
}

// ── Snapshot benchmarks ───────────────────────────────────────────────────────

/// Compare `Process::state()` (full replay) vs `Process::state_with_snapshot()`
/// (tail replay from snapshot) at 100 and 1 000 events.
///
/// The snapshot is taken at event 50 (half-way) so `state_with_snapshot` always
/// replays exactly 50 or 950 tail events.  This proves that a `snapshot_interval`
/// of 100 bounds replay cost to at most 100 events regardless of stream depth.
fn bench_state_vs_state_with_snapshot(c: &mut Criterion) {
    let rt = make_rt();
    let mut group = c.benchmark_group("storage/state_vs_snapshot");

    for &n in &[100usize, 1_000] {
        // Build a Process with `n` events by executing InitiateAnmeldung once and
        // injecting additional raw events directly via the store.
        let (event_store, stream) = seeded_event_store(&rt, n);
        let snap_store = InMemorySnapshotStore::new();

        // Take a snapshot at the midpoint so `state_with_snapshot` is meaningful.
        let snap_point = (n / 2) as u64;
        rt.block_on(async {
            // Fold to midpoint to get the event count at that point.
            let tail = event_store
                .fold_stream(&stream, 0, 0usize, |acc, _| Ok::<_, EngineError>(acc + 1))
                .await
                .expect("fold must succeed");
            // Store a snapshot at the midpoint sequence number.
            let snap = Snapshot::new(
                stream.clone(),
                snap_point,
                1, // schema_version
                serde_json::json!(tail),
            );
            snap_store.save(&snap).await.expect("save must succeed");
        });

        // Full replay.
        {
            let store_ref = event_store.clone();
            let stream_ref = stream.clone();
            group.bench_with_input(BenchmarkId::new("full_replay", n), &n, |b, _| {
                b.to_async(&rt).iter(|| async {
                    let count: usize = store_ref
                        .fold_stream(&stream_ref, 0, 0usize, |acc, _| {
                            Ok::<_, EngineError>(acc + 1)
                        })
                        .await
                        .expect("fold must succeed");
                    black_box(count);
                });
            });
        }

        // Tail replay from snapshot.
        {
            let store_ref = event_store.clone();
            let stream_ref = stream.clone();
            let snap_ref = snap_store.clone();
            group.bench_with_input(BenchmarkId::new("tail_from_snapshot", n), &n, |b, _| {
                b.to_async(&rt).iter(|| async {
                    use mako_engine::snapshot::SnapshotStore as _;
                    let maybe = snap_ref.load(&stream_ref).await.expect("load must succeed");
                    let from_seq = maybe.map_or(0, |s| s.sequence_number);
                    let count: usize = store_ref
                        .fold_stream(&stream_ref, from_seq, 0usize, |acc, _| {
                            Ok::<_, EngineError>(acc + 1)
                        })
                        .await
                        .expect("fold must succeed");
                    black_box(count);
                });
            });
        }
    }

    group.finish();
}

// ── OutboxStore benchmarks ────────────────────────────────────────────────────

/// `OutboxStore::pending_now(limit=50)` at queue depths of 0, 100, and 1 000.
///
/// The delivery worker polls this method every tick.  At backlog depths common
/// after an outage recovery (hundreds of UTILMD messages), the poll latency
/// must remain sub-millisecond to avoid blocking the delivery loop.
fn bench_pending_at_depth(c: &mut Criterion) {
    let rt = make_rt();
    let mut group = c.benchmark_group("storage/outbox_pending_at_depth");

    for &depth in &[0usize, 100, 1_000] {
        let store = seeded_outbox_store(&rt, depth);
        group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, _| {
            b.to_async(&rt).iter(|| async {
                let messages = store
                    .pending_now(50)
                    .await
                    .expect("pending_now must succeed");
                // At depth 0, messages is empty; at higher depths, capped at 50.
                black_box(messages.len());
            });
        });
    }

    group.finish();
}

/// `OutboxStore::enqueue` for a single message — the per-command outbox write cost.
fn bench_enqueue_single(c: &mut Criterion) {
    let rt = make_rt();

    c.bench_function("storage/outbox_enqueue_single", |b| {
        b.to_async(&rt).iter(|| async {
            let store = InMemoryOutboxStore::new();
            store
                .enqueue(&[black_box(make_outbox_message())])
                .await
                .expect("enqueue must succeed");
        });
    });
}

// ── Groups ────────────────────────────────────────────────────────────────────

criterion_group!(
    event_store_benches,
    bench_append_single_event,
    bench_append_batch_10,
    bench_load_n_events,
);

criterion_group!(snapshot_benches, bench_state_vs_state_with_snapshot,);

criterion_group!(outbox_benches, bench_pending_at_depth, bench_enqueue_single,);

criterion_main!(event_store_benches, snapshot_benches, outbox_benches);