rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Engine-v2 isolated throughput bench (Phase 5a).
//!
//! Measures the raw `PeerEngine` path — two engines wired over TCP
//! loopback, one pumping messages, the other draining them. There is
//! no socket layer, no fair-queue, no round-robin: just the engine's
//! outbound/inbound flume channels and the reader/writer tasks feeding
//! the framed codec.
//!
//! This is a **baseline** for Phase 5b/5c/5d — the socket-wired numbers
//! there should track this closely once the outer machinery drops out
//! of the hot path.

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};

use rustzmq2::ZmqMessage;

const BATCH_SIZE: usize = 1024;
const MSG_SIZES: &[usize] = &[16, 256, 4096];

/// 2-worker tokio runtime. See `compare_libzmq.rs::build_rt` for rationale.
fn build_rt() -> Runtime {
    Builder::new_multi_thread()
        .worker_threads(2)
        .enable_all()
        .build()
        .expect("tokio runtime")
}

fn bench_engine_pipelined(c: &mut Criterion) {
    use rustzmq2::__bench::engine::tcp_engine_pair;

    let rt = build_rt();

    let mut group = c.benchmark_group("engine/pipelined_throughput");
    group.sample_size(10);
    group.measurement_time(Duration::from_secs(8));
    group.warm_up_time(Duration::from_secs(2));

    for &msg_size in MSG_SIZES {
        let total_bytes = (BATCH_SIZE as u64) * (msg_size as u64);
        group.throughput(Throughput::Bytes(total_bytes));
        group.bench_with_input(
            BenchmarkId::from_parameter(msg_size),
            &msg_size,
            |b, &msg_size| {
                let (tx, rx) = rt.block_on(async {
                    // HWM large enough that neither side blocks on the
                    // channel — we're isolating I/O + codec + task
                    // scheduling, not HWM behavior.
                    tcp_engine_pair(4096, 4096).await
                });

                let payload = vec![0xABu8; msg_size];

                b.iter(|| {
                    rt.block_on(async {
                        let send_fut = async {
                            for _ in 0..BATCH_SIZE {
                                tx.send(ZmqMessage::from(payload.clone()))
                                    .await
                                    .expect("engine send");
                            }
                        };
                        let recv_fut = async {
                            for _ in 0..BATCH_SIZE {
                                let got = rx.recv().await.expect("channel closed").expect("codec");
                                black_box(got);
                            }
                        };
                        futures::join!(send_fut, recv_fut);
                    });
                });

                // Keep the engines alive across iterations; criterion
                // re-invokes b.iter, and we don't want to pay
                // construction cost each sample.
                drop(tx);
                drop(rx);
            },
        );
    }
    group.finish();
}

criterion_group!(benches, bench_engine_pipelined);
criterion_main!(benches);