use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use rustzmq2::{prelude::*, DealerSocket, PubSocket, RouterSocket, SubSocket, ZmqMessage};
fn build_rt() -> Runtime {
Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("tokio runtime")
}
const BATCH_SIZE: usize = 1024;
const PIPELINE_SIZES: &[usize] = &[256, 4096];
const SUB_COUNTS: &[usize] = &[1, 8, 64];
static IPC_SEQ: AtomicU64 = AtomicU64::new(0);
fn ipc_path(tag: &str) -> String {
let n = IPC_SEQ.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
format!("ipc:///tmp/zmq-tput-{}-{}-{}.sock", tag, pid, n)
}
fn bench_zmqrs_pub_pipelined(c: &mut Criterion) {
let rt = build_rt();
for &transport in &["tcp", "ipc", "inproc"] {
for &n_subs in SUB_COUNTS {
let mut group = c.benchmark_group(format!(
"zmqrs/throughput/pub_fanout/{}/subs={}",
transport, n_subs
));
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in PIPELINE_SIZES {
let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64) * (n_subs as u64);
group.throughput(Throughput::Bytes(total_bytes));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
bench_zmqrs_pub_pipelined_one(b, &rt, n_subs, msg_size, transport);
},
);
}
group.finish();
}
}
}
fn bench_zmqrs_pub_pipelined_one(
b: &mut criterion::Bencher<'_>,
rt: &Runtime,
n_subs: usize,
msg_size: usize,
transport: &str,
) {
let endpoint = match transport {
"tcp" => "tcp://127.0.0.1:0".to_string(),
"ipc" => ipc_path(&format!("zmqrs-pub-{}-{}", n_subs, msg_size)),
"inproc" => format!(
"inproc://bench-tput-zmqrs-pub-{}-{}-{}",
std::process::id(),
n_subs,
msg_size
),
_ => unreachable!(),
};
let (mut pub_sock, mut subs) = rt.block_on(async {
let mut p = PubSocket::builder().send_hwm(BATCH_SIZE * 4).build();
let bound = p.bind(&endpoint).await.expect("bind").to_string();
let mut subs: Vec<SubSocket> = Vec::with_capacity(n_subs);
for _ in 0..n_subs {
let mut s = SubSocket::builder().receive_hwm(BATCH_SIZE * 4).build();
s.connect(bound.as_str()).await.expect("sub connect");
s.subscribe("").await.expect("subscribe");
subs.push(s);
}
let handshake_byte = ZmqMessage::from(vec![0xFFu8; 1]);
let deadline = std::time::Instant::now() + Duration::from_secs(10);
let mut remaining_subs: Vec<SubSocket> = subs;
let mut ready: Vec<SubSocket> = Vec::with_capacity(n_subs);
while !remaining_subs.is_empty() {
if std::time::Instant::now() > deadline {
panic!("zmqrs pub→sub subscription handshake timed out after 10s");
}
p.send(handshake_byte.clone()).await.expect("pub handshake");
let mut still_waiting = Vec::with_capacity(remaining_subs.len());
for mut s in remaining_subs.drain(..) {
match tokio::time::timeout(Duration::from_millis(5), s.recv()).await {
Ok(Ok(_)) => ready.push(s),
Ok(Err(_)) => panic!("sub recv errored during handshake"),
Err(_) => still_waiting.push(s),
}
}
remaining_subs = still_waiting;
}
for s in &mut ready {
loop {
match tokio::time::timeout(Duration::from_millis(50), s.recv()).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => panic!("sub drain error: {:?}", e),
Err(_) => break,
}
}
}
(p, ready)
});
let payload = vec![0xABu8; msg_size];
b.iter(|| {
rt.block_on(async {
let sub_handles: Vec<_> = subs
.drain(..)
.map(|mut s| {
tokio::spawn(async move {
for _ in 0..BATCH_SIZE {
match tokio::time::timeout(Duration::from_millis(200), s.recv()).await {
Ok(Ok(m)) => {
black_box(m);
}
Ok(Err(e)) => panic!("sub recv error: {e:?}"),
Err(_) => break,
}
}
s
})
})
.collect();
for _ in 0..BATCH_SIZE {
pub_sock
.send(ZmqMessage::from(payload.clone()))
.await
.expect("pub send");
}
for h in sub_handles {
subs.push(h.await.expect("sub task"));
}
});
});
drop(pub_sock);
drop(subs);
}
fn bench_zmqrs_dealer_router_pipelined(c: &mut Criterion) {
let rt = build_rt();
for &transport in &["tcp", "ipc", "inproc"] {
let mut group = c.benchmark_group(format!("zmqrs/throughput/dealer_router/{}", transport));
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in PIPELINE_SIZES {
let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64);
group.throughput(Throughput::Bytes(total_bytes));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
bench_zmqrs_dealer_router_one(b, &rt, msg_size, transport);
},
);
}
group.finish();
}
}
fn bench_zmqrs_dealer_router_one(
b: &mut criterion::Bencher<'_>,
rt: &Runtime,
msg_size: usize,
transport: &str,
) {
let endpoint = match transport {
"tcp" => "tcp://127.0.0.1:0".to_string(),
"ipc" => ipc_path(&format!("zmqrs-dr-{}", msg_size)),
"inproc" => format!(
"inproc://bench-tput-zmqrs-dr-{}-{}",
std::process::id(),
msg_size
),
_ => unreachable!(),
};
let (dealer, router) = rt.block_on(async {
let mut r = RouterSocket::builder()
.send_hwm(BATCH_SIZE * 4)
.receive_hwm(BATCH_SIZE * 4)
.build();
let bound = r.bind(&endpoint).await.expect("router bind").to_string();
let mut d = DealerSocket::builder()
.send_hwm(BATCH_SIZE * 4)
.receive_hwm(BATCH_SIZE * 4)
.build();
d.connect(bound.as_str()).await.expect("dealer connect");
tokio::time::sleep(Duration::from_millis(50)).await;
(d, r)
});
let mut dealer = Some(dealer);
let mut router = Some(router);
let payload = vec![0xCDu8; msg_size];
b.iter(|| {
let mut owned_router = router.take().unwrap();
let mut owned_dealer = dealer.take().unwrap();
rt.block_on(async {
let router_task = tokio::spawn(async move {
for _ in 0..BATCH_SIZE {
let m = owned_router.recv().await.expect("router recv");
owned_router.send(m).await.expect("router send");
}
owned_router
});
for _ in 0..BATCH_SIZE {
owned_dealer
.send(ZmqMessage::from(payload.clone()))
.await
.expect("dealer send");
}
for _ in 0..BATCH_SIZE {
let got = owned_dealer.recv().await.expect("dealer recv");
black_box(got);
}
let owned_router = router_task.await.expect("router task");
router.replace(owned_router);
dealer.replace(owned_dealer);
});
});
drop(dealer);
drop(router);
}
fn bench_libzmq_pub_pipelined(c: &mut Criterion) {
for &transport in &["tcp", "ipc"] {
for &n_subs in SUB_COUNTS {
let mut group = c.benchmark_group(format!(
"libzmq/throughput/pub_fanout/{}/subs={}",
transport, n_subs
));
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in PIPELINE_SIZES {
let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64) * (n_subs as u64);
group.throughput(Throughput::Bytes(total_bytes));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
bench_libzmq_pub_pipelined_one(b, n_subs, msg_size, transport);
},
);
}
group.finish();
}
}
}
fn bench_libzmq_pub_pipelined_one(
b: &mut criterion::Bencher<'_>,
n_subs: usize,
msg_size: usize,
transport: &str,
) {
let endpoint = match transport {
"tcp" => "tcp://127.0.0.1:0".to_string(),
"ipc" => format!(
"ipc:///tmp/libzmq-tput-pub-{}-{}-{}-{}.sock",
std::process::id(),
IPC_SEQ.fetch_add(1, Ordering::Relaxed),
n_subs,
msg_size
),
"inproc" => format!(
"inproc://libzmq-tput-pub-{}-{}-{}",
std::process::id(),
n_subs,
msg_size
),
_ => unreachable!(),
};
let ctx = zmq2::Context::new();
let pub_sock = ctx.socket(zmq2::PUB).expect("pub socket");
let hwm = (BATCH_SIZE * 16) as i32;
pub_sock.set_sndhwm(hwm).expect("pub send_hwm");
pub_sock.bind(&endpoint).expect("bind");
let bound = pub_sock
.get_last_endpoint()
.expect("last_endpoint")
.unwrap();
struct SubHandle {
tx_drive: mpsc::Sender<Option<usize>>,
rx_done: mpsc::Receiver<()>,
_thread: thread::JoinHandle<()>,
}
let mut subs: Vec<SubHandle> = Vec::with_capacity(n_subs);
for _ in 0..n_subs {
let ctx2 = ctx.clone();
let bound2 = bound.clone();
let (tx_drive, rx_drive) = mpsc::channel::<Option<usize>>();
let (tx_done, rx_done) = mpsc::channel::<()>();
let t = thread::spawn(move || {
let sub = ctx2.socket(zmq2::SUB).expect("sub socket");
sub.set_rcvhwm((BATCH_SIZE * 16) as i32)
.expect("sub receive_hwm");
sub.connect(&bound2).expect("sub connect");
sub.set_subscribe(b"").expect("subscribe");
tx_done.send(()).ok();
while let Ok(cmd) = rx_drive.recv() {
match cmd {
None => {
let _ = sub.recv_bytes(0).expect("sub handshake recv");
}
Some(0) => {
sub.set_rcvtimeo(100)
.expect("set receive_timeout for drain");
loop {
match sub.recv_bytes(0) {
Ok(_) => {}
Err(zmq2::Error::EAGAIN) => break,
Err(e) => panic!("sub drain: {:?}", e),
}
}
sub.set_rcvtimeo(-1).expect("clear receive_timeout");
}
Some(n) => {
sub.set_rcvtimeo(200)
.expect("set receive_timeout for batch recv");
let mut received = 0usize;
for _ in 0..n {
match sub.recv_bytes(0) {
Ok(_) => received += 1,
Err(zmq2::Error::EAGAIN) => break,
Err(e) => panic!("sub recv failed: {e:?}"),
}
}
sub.set_rcvtimeo(-1).expect("clear receive_timeout");
let _ = received; }
}
if tx_done.send(()).is_err() {
break;
}
}
});
rx_done.recv().expect("sub ready");
subs.push(SubHandle {
tx_drive,
rx_done,
_thread: t,
});
}
for s in &subs {
s.tx_drive
.send(None)
.expect("drive sub (handshake phase 1)");
}
let handshake_timeout = std::time::Instant::now() + Duration::from_secs(30);
let mut pending_ack = vec![true; n_subs];
let mut remaining = n_subs;
while remaining > 0 {
if std::time::Instant::now() > handshake_timeout {
panic!("libzmq pub→sub subscription handshake phase 1 timed out after 30s");
}
pub_sock
.send(&[0xFFu8][..], 0)
.expect("pub send (handshake)");
thread::sleep(Duration::from_millis(5));
for (i, s) in subs.iter().enumerate() {
if !pending_ack[i] {
continue;
}
match s.rx_done.try_recv() {
Ok(()) => {
pending_ack[i] = false;
remaining -= 1;
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
panic!("sub thread exited during handshake phase 1")
}
}
}
}
for s in &subs {
s.tx_drive
.send(Some(0))
.expect("drive sub (handshake phase 2: drain)");
}
let drain_timeout = std::time::Instant::now() + Duration::from_secs(10);
for s in &subs {
let mut acked = false;
while !acked {
if std::time::Instant::now() > drain_timeout {
panic!("libzmq pub→sub handshake phase 2 timed out after 10s");
}
match s.rx_done.recv_timeout(Duration::from_millis(250)) {
Ok(()) => acked = true,
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
panic!("sub thread exited during handshake phase 2")
}
}
}
}
let payload: Vec<u8> = vec![0xABu8; msg_size];
b.iter(|| {
for s in &subs {
s.tx_drive.send(Some(BATCH_SIZE)).expect("drive sub");
}
for _ in 0..BATCH_SIZE {
pub_sock.send(&payload[..], 0).expect("pub send");
}
for s in &subs {
s.rx_done.recv().expect("sub done");
}
});
for s in subs {
drop(s.tx_drive);
drop(s.rx_done);
}
}
fn bench_libzmq_dealer_router_pipelined(c: &mut Criterion) {
for &transport in &["tcp", "ipc", "inproc"] {
let mut group = c.benchmark_group(format!("libzmq/throughput/dealer_router/{}", transport));
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in PIPELINE_SIZES {
let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64);
group.throughput(Throughput::Bytes(total_bytes));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
bench_libzmq_dealer_router_one(b, msg_size, transport);
},
);
}
group.finish();
}
}
fn bench_libzmq_dealer_router_one(
b: &mut criterion::Bencher<'_>,
msg_size: usize,
transport: &str,
) {
let endpoint = match transport {
"tcp" => "tcp://127.0.0.1:0".to_string(),
"ipc" => format!(
"ipc:///tmp/libzmq-tput-dr-{}-{}-{}.sock",
std::process::id(),
IPC_SEQ.fetch_add(1, Ordering::Relaxed),
msg_size
),
"inproc" => format!(
"inproc://libzmq-tput-dr-{}-{}",
std::process::id(),
msg_size
),
_ => unreachable!(),
};
let ctx = zmq2::Context::new();
let router_sock = ctx.socket(zmq2::ROUTER).expect("router socket");
let hwm = (BATCH_SIZE * 4) as i32;
router_sock.set_sndhwm(hwm).expect("router send_hwm");
router_sock.set_rcvhwm(hwm).expect("router receive_hwm");
router_sock.bind(&endpoint).expect("router bind");
let bound = router_sock
.get_last_endpoint()
.expect("last_endpoint")
.unwrap();
router_sock.set_rcvtimeo(100).expect("set receive_timeout");
let stop = Arc::new(AtomicBool::new(false));
let stop_t = stop.clone();
let router_thread = thread::spawn(move || loop {
match router_sock.recv_multipart(0) {
Ok(parts) => {
if router_sock.send_multipart(&parts, 0).is_err() {
break;
}
}
Err(zmq2::Error::EAGAIN) => {
if stop_t.load(Ordering::Relaxed) {
break;
}
}
Err(_) => break,
}
});
let dealer_sock = ctx.socket(zmq2::DEALER).expect("dealer socket");
dealer_sock.set_sndhwm(hwm).expect("dealer send_hwm");
dealer_sock.set_rcvhwm(hwm).expect("dealer receive_hwm");
dealer_sock.connect(&bound).expect("dealer connect");
thread::sleep(Duration::from_millis(50));
let payload: Vec<u8> = vec![0xCDu8; msg_size];
b.iter(|| {
for _ in 0..BATCH_SIZE {
dealer_sock.send(&payload[..], 0).expect("dealer send");
}
for _ in 0..BATCH_SIZE {
let got = dealer_sock.recv_bytes(0).expect("dealer recv");
black_box(got);
}
});
stop.store(true, Ordering::Relaxed);
drop(dealer_sock);
router_thread.join().ok();
}
criterion_group!(
benches,
bench_zmqrs_pub_pipelined,
bench_zmqrs_dealer_router_pipelined,
bench_libzmq_pub_pipelined,
bench_libzmq_dealer_router_pipelined,
);
criterion_main!(benches);