use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use mire::{
Aggregate, Event, EventData, EventHandler, EventStore, EventStoreError, ExpectedVersion,
HandledEvent, ProjectionRunner, Snapshot,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum CounterEvent {
Incremented { by: i64 },
}
impl EventData for CounterEvent {
fn event_type(&self) -> &'static str {
match self {
CounterEvent::Incremented { .. } => "counter.incremented",
}
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct Counter {
total: i64,
}
impl Snapshot for Counter {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 100;
}
impl Aggregate for Counter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"counter"
}
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Incremented { by } => self.total += by,
}
}
}
async fn store() -> EventStore {
let url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
eprintln!("DATABASE_URL not set — falling back to default");
"postgres://mire:mire@localhost:5434/mire".to_string()
});
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(64)
.connect(&url)
.await
.expect("connect");
let store = EventStore::new(pool);
store.migrate().await.expect("migrate");
store
}
fn percentile(sorted: &[Duration], p: usize) -> Duration {
if sorted.is_empty() {
return Duration::ZERO;
}
let idx = ((sorted.len() - 1) * p) / 100;
sorted[idx]
}
fn report(id: &str, what: &str, count: u64, elapsed: Duration, durations: &[Duration]) {
let per_sec = count as f64 / elapsed.as_secs_f64();
let avg = elapsed / count.max(1) as u32;
let mut sorted = durations.to_vec();
sorted.sort();
let p50 = percentile(&sorted, 50);
let p99 = percentile(&sorted, 99);
println!(
"[{id:<11}] {what:<52} | {elapsed:>9.2?} | {per_sec:>8.0} ops/s | avg {avg:>9.2?} | p50 {p50:>9.2?} | p99 {p99:>9.2?}",
);
}
fn report_total(id: &str, what: &str, elapsed: Duration) {
println!("[{id:<11}] {what:<52} | {elapsed:>9.2?}");
}
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() {
let filter = std::env::var("MIRE_BENCH").ok();
let only = filter.as_deref().map(|s| s.to_lowercase());
println!("mire throughput bench");
println!("=====================");
if let Some(id) = &only {
println!("filter: only {id}");
}
println!();
let store = store().await;
let want = |id: &str| only.as_deref().is_none_or(|f| f == id);
if want("b1") {
bench_b1_sequential_append(&store).await;
}
if want("b2") {
bench_b2_parallel_append_distinct_streams(&store).await;
}
if want("b3") {
bench_b3_contended_single_stream(&store).await;
}
if want("b4") {
bench_b4_aggregate_replay(&store).await;
}
if want("b5") {
bench_b5_projection_catchup(&store).await;
}
if want("b6") {
bench_b6_raw_append_any_version(&store).await;
}
if want("b7") {
bench_b7_scale_out(&store).await;
}
if want("b8") {
bench_b8_batched_append(&store).await;
}
if want("b9") {
bench_b9_deep_replay_no_snapshot(&store).await;
}
if want("b10") {
bench_b10_deep_replay_with_snapshot(&store).await;
}
if want("b11") {
bench_b11_many_subscriptions(&store).await;
}
if want("b12") {
bench_b12_mixed_read_write(&store).await;
}
}
async fn bench_b1_sequential_append(store: &EventStore) {
let id = format!("bench-b1-{}", Uuid::new_v4().simple());
for _ in 0..50 {
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
}
let n = 1000u64;
let mut durations = Vec::with_capacity(n as usize);
let start = Instant::now();
for _ in 0..n {
let t = Instant::now();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
durations.push(t.elapsed());
}
let elapsed = start.elapsed();
report(
"B1",
&format!("sequential append 1×{n} events"),
n,
elapsed,
&durations,
);
}
async fn bench_b2_parallel_append_distinct_streams(store: &EventStore) {
let streams = 32u64;
let events_per_stream: u64 = 50;
let total = streams * events_per_stream;
let mut warmup = Vec::new();
for i in 0..streams {
let store = store.clone();
let id = format!("bench-b2-warm-{}-{i}", Uuid::new_v4().simple());
warmup.push(tokio::spawn(async move {
for _ in 0..5 {
let mut c = store.load_or_default::<Counter>(&id).await.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
store.save(&mut c).await.unwrap();
}
}));
}
for h in warmup {
h.await.unwrap();
}
let test_id = Uuid::new_v4().simple().to_string();
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..streams {
let store = store.clone();
let id = format!("bench-b2-{test_id}-{i}");
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(events_per_stream as usize);
for _ in 0..events_per_stream {
let t = Instant::now();
let mut c = store.load_or_default::<Counter>(&id).await.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
store.save(&mut c).await.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut durations = Vec::with_capacity(total as usize);
for h in handles {
durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
report(
"B2",
&format!("parallel append {streams}×{events_per_stream} events"),
total,
elapsed,
&durations,
);
}
async fn bench_b3_contended_single_stream(store: &EventStore) {
let writers = 16u64;
let events_per_writer: u64 = 25;
let target_events = writers * events_per_writer;
let id = format!("bench-b3-{}", Uuid::new_v4().simple());
let conflicts = Arc::new(AtomicU64::new(0));
let successes = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let mut handles = Vec::new();
for _ in 0..writers {
let store = store.clone();
let id = id.clone();
let conflicts = conflicts.clone();
let successes = successes.clone();
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(events_per_writer as usize);
let mut wins: u64 = 0;
while wins < events_per_writer {
let t = Instant::now();
loop {
let mut c = store.load_or_default::<Counter>(&id).await.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
match store.save(&mut c).await {
Ok(()) => {
wins += 1;
successes.fetch_add(1, Ordering::Relaxed);
local.push(t.elapsed());
break;
}
Err(EventStoreError::ConcurrencyConflict { .. }) => {
conflicts.fetch_add(1, Ordering::Relaxed);
}
Err(e) => panic!("unexpected error: {e}"),
}
}
}
local
}));
}
let mut durations = Vec::with_capacity(target_events as usize);
for h in handles {
durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
let successes = successes.load(Ordering::Relaxed);
let conflicts = conflicts.load(Ordering::Relaxed);
let conflict_ratio = conflicts as f64 / (successes + conflicts).max(1) as f64;
let pct = format!("{:.0}%", conflict_ratio * 100.0);
report(
"B3",
&format!("contended append {writers}w → {target_events} events ({pct} conflicts)"),
successes,
elapsed,
&durations,
);
}
async fn bench_b4_aggregate_replay(store: &EventStore) {
let id = format!("bench-b4-{}", Uuid::new_v4().simple());
let depth = 200;
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
for _ in 0..depth {
counter.record(CounterEvent::Incremented { by: 1 });
}
store.save(&mut counter).await.unwrap();
for _ in 0..20 {
let _ = store.load::<Counter>(&id).await.unwrap();
}
let n: u64 = 500;
let mut durations = Vec::with_capacity(n as usize);
let start = Instant::now();
for _ in 0..n {
let t = Instant::now();
let _ = store.load::<Counter>(&id).await.unwrap();
durations.push(t.elapsed());
}
let elapsed = start.elapsed();
report(
"B4",
&format!("aggregate replay (depth {depth}) × {n}"),
n,
elapsed,
&durations,
);
}
struct CountingHandler {
db: PgPool,
table: String,
applied: Arc<AtomicU64>,
}
impl EventHandler for CountingHandler {
type Aggregate = Counter;
async fn handle(&self, event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
let CounterEvent::Incremented { by } = event.event;
sqlx::query(sqlx::AssertSqlSafe(format!(
"INSERT INTO {} (stream_id, total) VALUES ($1, $2)
ON CONFLICT (stream_id) DO UPDATE SET total = {}.total + EXCLUDED.total",
self.table, self.table
)))
.bind(event.stream_id())
.bind(by)
.execute(&self.db)
.await?;
self.applied.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
async fn bench_b5_projection_catchup(store: &EventStore) {
let pool = store.pool().clone();
let suffix = Uuid::new_v4().simple().to_string();
let table = format!("bench_b5_totals_{suffix}");
sqlx::raw_sql(sqlx::AssertSqlSafe(format!(
"CREATE TABLE {table} (stream_id TEXT PRIMARY KEY, total BIGINT NOT NULL)"
)))
.execute(&pool)
.await
.unwrap();
let applied = Arc::new(AtomicU64::new(0));
let runner = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(10))
.lease_ttl(Duration::from_secs(5))
.lease_recheck_interval(Duration::from_millis(100))
.subscribe(
format!("bench-b5-{suffix}"),
CountingHandler {
db: pool.clone(),
table: table.clone(),
applied: applied.clone(),
},
)
.build();
let cancel = CancellationToken::new();
let runner_handle = {
let cancel = cancel.clone();
tokio::spawn(async move { runner.run(cancel).await })
};
let streams = 16u64;
let events_per_stream: u64 = 25;
let total = streams * events_per_stream;
let append_start = Instant::now();
let mut handles = Vec::new();
for i in 0..streams {
let store = store.clone();
let id = format!("bench-b5-{suffix}-{i}");
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(events_per_stream as usize);
for _ in 0..events_per_stream {
let t = Instant::now();
let mut c = store.load_or_default::<Counter>(&id).await.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
store.save(&mut c).await.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut write_durations = Vec::with_capacity(total as usize);
for h in handles {
write_durations.extend(h.await.unwrap());
}
let append_done = append_start.elapsed();
let catchup_start = Instant::now();
while applied.load(Ordering::Relaxed) < total {
if catchup_start.elapsed() > Duration::from_secs(30) {
panic!(
"projection did not catch up: {}/{total} after 30s",
applied.load(Ordering::Relaxed)
);
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let catchup = catchup_start.elapsed();
cancel.cancel();
let _ = runner_handle.await;
report(
"B5/write",
&format!("append {streams}×{events_per_stream}"),
total,
append_done,
&write_durations,
);
report_total(
"B5/catchup",
&format!("projection catchup {total} events"),
catchup,
);
report_total(
"B5/total",
"end-to-end (write → read model)",
append_done + catchup,
);
sqlx::raw_sql(sqlx::AssertSqlSafe(format!("DROP TABLE {table}")))
.execute(&pool)
.await
.unwrap();
}
async fn bench_b6_raw_append_any_version(store: &EventStore) {
let streams = 32u64;
let events_per_stream: u64 = 100;
let total = streams * events_per_stream;
let suffix = Uuid::new_v4().simple().to_string();
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..streams {
let store = store.clone();
let stream_id = format!("counter-bench-b6-{suffix}-{i}");
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(events_per_stream as usize);
for _ in 0..events_per_stream {
let t = Instant::now();
store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
ExpectedVersion::Any,
&[Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut durations = Vec::with_capacity(total as usize);
for h in handles {
durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
report(
"B6",
&format!("raw append (no OCC) {streams}×{events_per_stream}"),
total,
elapsed,
&durations,
);
}
async fn bench_b7_scale_out(store: &EventStore) {
let events_per_stream: u64 = 25;
for streams in [32u64, 64, 128, 256] {
let total = streams * events_per_stream;
let suffix = Uuid::new_v4().simple().to_string();
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..streams {
let store = store.clone();
let stream_id = format!("counter-bench-b7-{suffix}-{i}");
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(events_per_stream as usize);
for _ in 0..events_per_stream {
let t = Instant::now();
store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
ExpectedVersion::Any,
&[Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut durations = Vec::with_capacity(total as usize);
for h in handles {
durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
report(
&format!("B7/k={streams}"),
&format!("scale-out raw append × {events_per_stream}"),
total,
elapsed,
&durations,
);
}
}
async fn bench_b8_batched_append(store: &EventStore) {
let streams = 32u64;
let batch_size = 10u64;
let batches_per_stream = 5u64;
let total = streams * batch_size * batches_per_stream;
let suffix = Uuid::new_v4().simple().to_string();
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..streams {
let store = store.clone();
let stream_id = format!("counter-bench-b8-{suffix}-{i}");
handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(batches_per_stream as usize);
for _ in 0..batches_per_stream {
let events: Vec<Event<CounterEvent>> = (0..batch_size)
.map(|_| Event::new(CounterEvent::Incremented { by: 1 }))
.collect();
let t = Instant::now();
store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
ExpectedVersion::Any,
&events,
)
.await
.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut call_durations = Vec::with_capacity((streams * batches_per_stream) as usize);
for h in handles {
call_durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
report(
"B8",
&format!("batched append {streams}×{batches_per_stream} calls × {batch_size} ev/call"),
total,
elapsed,
&call_durations,
);
}
async fn bench_b9_deep_replay_no_snapshot(store: &EventStore) {
let id = format!("bench-b9-{}", Uuid::new_v4().simple());
let depth: u64 = 10_000;
let events: Vec<Event<CounterEvent>> = (0..depth)
.map(|_| Event::new(CounterEvent::Incremented { by: 1 }))
.collect();
let stream_id = format!("counter-{id}");
store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
ExpectedVersion::NoStream,
&events,
)
.await
.unwrap();
for _ in 0..5 {
let _ = store.load::<Counter>(&id).await.unwrap();
}
let n: u64 = 50;
let mut durations = Vec::with_capacity(n as usize);
let start = Instant::now();
for _ in 0..n {
let t = Instant::now();
let _ = store.load::<Counter>(&id).await.unwrap();
durations.push(t.elapsed());
}
let elapsed = start.elapsed();
report(
"B9",
&format!("deep replay no-snapshot (depth {depth}) × {n}"),
n,
elapsed,
&durations,
);
}
async fn bench_b10_deep_replay_with_snapshot(store: &EventStore) {
let id = format!("bench-b10-{}", Uuid::new_v4().simple());
let depth: u64 = 10_000;
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
let batch_size = 100u64; for _ in 0..(depth / batch_size) {
for _ in 0..batch_size {
counter.record(CounterEvent::Incremented { by: 1 });
}
store.save_snapshotting(&mut counter).await.unwrap();
}
for _ in 0..5 {
let _ = store.load_snapshotted::<Counter>(&id).await.unwrap();
}
let n: u64 = 50;
let mut durations = Vec::with_capacity(n as usize);
let start = Instant::now();
for _ in 0..n {
let t = Instant::now();
let _ = store.load_snapshotted::<Counter>(&id).await.unwrap();
durations.push(t.elapsed());
}
let elapsed = start.elapsed();
report(
"B10",
&format!("deep replay w/ snapshot (depth {depth}) × {n}"),
n,
elapsed,
&durations,
);
}
async fn bench_b11_many_subscriptions(store: &EventStore) {
let pool = store.pool().clone();
let suffix = Uuid::new_v4().simple().to_string();
let n_subs = 16u64;
let n_events: u64 = 200;
for i in 0..n_events {
let mut counter = store
.load_or_default::<Counter>(&format!("b11-{suffix}-{i}"))
.await
.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
}
let tables: Vec<String> = (0..n_subs)
.map(|k| format!("bench_b11_totals_{suffix}_{k}"))
.collect();
for t in &tables {
sqlx::raw_sql(sqlx::AssertSqlSafe(format!(
"CREATE TABLE {t} (stream_id TEXT PRIMARY KEY, total BIGINT NOT NULL)"
)))
.execute(&pool)
.await
.unwrap();
}
let applied = Arc::new(AtomicU64::new(0));
let mut builder = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(20))
.lease_ttl(Duration::from_secs(5))
.lease_recheck_interval(Duration::from_millis(100));
for k in 0..n_subs {
builder = builder.subscribe(
format!("b11-{suffix}-sub-{k}"),
CountingHandler {
db: pool.clone(),
table: tables[k as usize].clone(),
applied: applied.clone(),
},
);
}
let runner = builder.build();
let cancel = CancellationToken::new();
let handle = {
let cancel = cancel.clone();
tokio::spawn(async move { runner.run(cancel).await })
};
let target = n_subs * n_events;
let start = Instant::now();
while applied.load(Ordering::Relaxed) < target {
if start.elapsed() > Duration::from_secs(60) {
panic!(
"B11 did not converge: {}/{target} applied after 60s",
applied.load(Ordering::Relaxed)
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let elapsed = start.elapsed();
cancel.cancel();
let _ = handle.await;
report_total(
"B11",
&format!("catchup {n_subs} subs × {n_events} events = {target}"),
elapsed,
);
println!(
"[B11/per-sub ] effective {:.0} handler-invocations/s",
target as f64 / elapsed.as_secs_f64()
);
for t in &tables {
sqlx::raw_sql(sqlx::AssertSqlSafe(format!("DROP TABLE {t}")))
.execute(&pool)
.await
.unwrap();
}
}
async fn bench_b12_mixed_read_write(store: &EventStore) {
let writers = 16u64;
let readers = 16u64;
let writes_per_writer: u64 = 30;
let reads_per_reader: u64 = 30;
let suffix = Uuid::new_v4().simple().to_string();
for i in 0..writers {
let mut c = store
.load_or_default::<Counter>(&format!("b12-{suffix}-{i}"))
.await
.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
store.save(&mut c).await.unwrap();
}
let start = Instant::now();
let mut writer_handles = Vec::new();
for i in 0..writers {
let store = store.clone();
let id = format!("b12-{suffix}-{i}");
writer_handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(writes_per_writer as usize);
for _ in 0..writes_per_writer {
let t = Instant::now();
let mut c = store.load_or_default::<Counter>(&id).await.unwrap();
c.record(CounterEvent::Incremented { by: 1 });
store.save(&mut c).await.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut reader_handles = Vec::new();
for i in 0..readers {
let store = store.clone();
let id = format!("b12-{suffix}-{}", i % writers);
reader_handles.push(tokio::spawn(async move {
let mut local = Vec::with_capacity(reads_per_reader as usize);
for _ in 0..reads_per_reader {
let t = Instant::now();
let _ = store.load::<Counter>(&id).await.unwrap();
local.push(t.elapsed());
}
local
}));
}
let mut write_durations: Vec<Duration> =
Vec::with_capacity((writers * writes_per_writer) as usize);
for h in writer_handles {
write_durations.extend(h.await.unwrap());
}
let mut read_durations: Vec<Duration> =
Vec::with_capacity((readers * reads_per_reader) as usize);
for h in reader_handles {
read_durations.extend(h.await.unwrap());
}
let elapsed = start.elapsed();
let total_ops = writers * writes_per_writer + readers * reads_per_reader;
report_total(
"B12/total",
&format!("{writers}w + {readers}r → {total_ops} ops"),
elapsed,
);
report(
"B12/writes",
&format!("write under read contention {writers}×{writes_per_writer}"),
writers * writes_per_writer,
elapsed,
&write_durations,
);
report(
"B12/reads",
&format!("read under write contention {readers}×{reads_per_reader}"),
readers * reads_per_reader,
elapsed,
&read_durations,
);
}