use std::hint::black_box;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use mako_engine::{
envelope::NewEvent,
error::EngineError,
event_store::{EventStore, ExpectedVersion, InMemoryEventStore},
ids::{ConversationId, CorrelationId, EventId, ProcessId, StreamId, TenantId},
outbox::{InMemoryOutboxStore, OutboxMessage, OutboxStore as _},
snapshot::{InMemorySnapshotStore, Snapshot, SnapshotStore as _},
version::WorkflowId,
};
use tokio::runtime::Runtime;
fn make_rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
fn wid() -> WorkflowId {
WorkflowId::new("gpke-lf-anmeldung", "FV2025-10-01")
}
fn make_event() -> NewEvent {
NewEvent::new(
CorrelationId::new(),
None,
ConversationId::new(),
ProcessId::new(),
TenantId::new(),
wid(),
"SupplierChangeInitiated",
1,
serde_json::json!({
"pruefidentifikator": 55001,
"sender": "4012345000023",
"receiver": "9900357000004",
"location_id": "51238696781",
"document_date": "20250115",
"message_ref": "REF-0000",
"validation_passed": true,
"validation_errors": []
}),
)
}
fn make_outbox_message() -> OutboxMessage {
OutboxMessage::new(
StreamId::new("bench/outbox-stream"),
ProcessId::new(),
TenantId::new(),
CorrelationId::new(),
ConversationId::new(),
EventId::new(),
"UTILMD",
"9900357000004",
serde_json::json!({
"message_type": "UTILMD",
"pid": 55001,
"sender": "4012345000023",
"receiver": "9900357000004"
}),
)
}
fn seeded_event_store(rt: &Runtime, n: usize) -> (InMemoryEventStore, StreamId) {
let store = InMemoryEventStore::new();
let stream = StreamId::new("bench/event-stream");
let events: Vec<NewEvent> = (0..n).map(|_| make_event()).collect();
rt.block_on(async {
store
.append(&stream, ExpectedVersion::NoStream, &events)
.await
.expect("seeded append must succeed");
});
(store, stream)
}
fn seeded_outbox_store(rt: &Runtime, depth: usize) -> InMemoryOutboxStore {
let store = InMemoryOutboxStore::new();
let messages: Vec<OutboxMessage> = (0..depth).map(|_| make_outbox_message()).collect();
rt.block_on(async {
store
.enqueue(&messages)
.await
.expect("seeded enqueue must succeed");
});
store
}
fn bench_append_single_event(c: &mut Criterion) {
let rt = make_rt();
c.bench_function("storage/append_single_event", |b| {
b.to_async(&rt).iter(|| async {
let store = InMemoryEventStore::new();
let stream = StreamId::new("bench/append-single");
store
.append(
&stream,
ExpectedVersion::NoStream,
&[black_box(make_event())],
)
.await
.expect("append must succeed");
});
});
}
fn bench_append_batch_10(c: &mut Criterion) {
let rt = make_rt();
c.bench_function("storage/append_batch_10", |b| {
b.to_async(&rt).iter(|| async {
let store = InMemoryEventStore::new();
let stream = StreamId::new("bench/append-batch");
let events: Vec<NewEvent> = (0..10).map(|_| make_event()).collect();
store
.append(&stream, ExpectedVersion::NoStream, black_box(&events))
.await
.expect("batch append must succeed");
});
});
}
fn bench_load_n_events(c: &mut Criterion) {
let rt = make_rt();
let mut group = c.benchmark_group("storage/load_n_events");
for &n in &[10usize, 100, 1_000, 10_000] {
let (store, stream) = seeded_event_store(&rt, n);
group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| {
b.to_async(&rt).iter(|| async {
let events = store
.load(black_box(&stream))
.await
.expect("load must succeed");
assert_eq!(events.len(), n);
});
});
}
group.finish();
}
fn bench_state_vs_state_with_snapshot(c: &mut Criterion) {
let rt = make_rt();
let mut group = c.benchmark_group("storage/state_vs_snapshot");
for &n in &[100usize, 1_000] {
let (event_store, stream) = seeded_event_store(&rt, n);
let snap_store = InMemorySnapshotStore::new();
let snap_point = (n / 2) as u64;
rt.block_on(async {
let tail = event_store
.fold_stream(&stream, 0, 0usize, |acc, _| Ok::<_, EngineError>(acc + 1))
.await
.expect("fold must succeed");
let snap = Snapshot::new(
stream.clone(),
snap_point,
1, serde_json::json!(tail),
);
snap_store.save(&snap).await.expect("save must succeed");
});
{
let store_ref = event_store.clone();
let stream_ref = stream.clone();
group.bench_with_input(BenchmarkId::new("full_replay", n), &n, |b, _| {
b.to_async(&rt).iter(|| async {
let count: usize = store_ref
.fold_stream(&stream_ref, 0, 0usize, |acc, _| {
Ok::<_, EngineError>(acc + 1)
})
.await
.expect("fold must succeed");
black_box(count);
});
});
}
{
let store_ref = event_store.clone();
let stream_ref = stream.clone();
let snap_ref = snap_store.clone();
group.bench_with_input(BenchmarkId::new("tail_from_snapshot", n), &n, |b, _| {
b.to_async(&rt).iter(|| async {
use mako_engine::snapshot::SnapshotStore as _;
let maybe = snap_ref.load(&stream_ref).await.expect("load must succeed");
let from_seq = maybe.map_or(0, |s| s.sequence_number);
let count: usize = store_ref
.fold_stream(&stream_ref, from_seq, 0usize, |acc, _| {
Ok::<_, EngineError>(acc + 1)
})
.await
.expect("fold must succeed");
black_box(count);
});
});
}
}
group.finish();
}
fn bench_pending_at_depth(c: &mut Criterion) {
let rt = make_rt();
let mut group = c.benchmark_group("storage/outbox_pending_at_depth");
for &depth in &[0usize, 100, 1_000] {
let store = seeded_outbox_store(&rt, depth);
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, _| {
b.to_async(&rt).iter(|| async {
let messages = store
.pending_now(50)
.await
.expect("pending_now must succeed");
black_box(messages.len());
});
});
}
group.finish();
}
fn bench_enqueue_single(c: &mut Criterion) {
let rt = make_rt();
c.bench_function("storage/outbox_enqueue_single", |b| {
b.to_async(&rt).iter(|| async {
let store = InMemoryOutboxStore::new();
store
.enqueue(&[black_box(make_outbox_message())])
.await
.expect("enqueue must succeed");
});
});
}
criterion_group!(
event_store_benches,
bench_append_single_event,
bench_append_batch_10,
bench_load_n_events,
);
criterion_group!(snapshot_benches, bench_state_vs_state_with_snapshot,);
criterion_group!(outbox_benches, bench_pending_at_depth, bench_enqueue_single,);
criterion_main!(event_store_benches, snapshot_benches, outbox_benches);