use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use rustzmq2::ZmqMessage;
const BATCH_SIZE: usize = 1024;
const MSG_SIZES: &[usize] = &[16, 256, 4096];
fn build_rt() -> Runtime {
Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("tokio runtime")
}
fn bench_engine_pipelined(c: &mut Criterion) {
use rustzmq2::__bench::engine::tcp_engine_pair;
let rt = build_rt();
let mut group = c.benchmark_group("engine/pipelined_throughput");
group.sample_size(10);
group.measurement_time(Duration::from_secs(8));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in MSG_SIZES {
let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64);
group.throughput(Throughput::Bytes(total_bytes));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
let (tx, rx) = rt.block_on(async {
tcp_engine_pair(4096, 4096).await
});
let payload = vec![0xABu8; msg_size];
b.iter(|| {
rt.block_on(async {
let send_fut = async {
for _ in 0..BATCH_SIZE {
tx.send(ZmqMessage::from(payload.clone()))
.await
.expect("engine send");
}
};
let recv_fut = async {
for _ in 0..BATCH_SIZE {
let got = rx.recv().await.expect("channel closed").expect("codec");
black_box(got);
}
};
futures::join!(send_fut, recv_fut);
});
});
drop(tx);
drop(rx);
},
);
}
group.finish();
}
criterion_group!(benches, bench_engine_pipelined);
criterion_main!(benches);