rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use crypto_box::{aead::OsRng, SecretKey};
use rustzmq2::{prelude::*, RepSocket, ReqSocket, ZmqMessage};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};

/// 2-worker tokio runtime. See `compare_libzmq.rs::build_rt` for rationale.
fn build_rt() -> Runtime {
    Builder::new_multi_thread()
        .worker_threads(2)
        .enable_all()
        .build()
        .expect("tokio runtime")
}

fn gen_keypair() -> ([u8; 32], [u8; 32]) {
    let sec = SecretKey::generate(&mut OsRng);
    let pub_ = sec.public_key();
    (*pub_.as_bytes(), sec.to_bytes())
}

async fn setup_curve(endpoint: &str) -> (ReqSocket, RepSocket) {
    let (server_pub, server_sec) = gen_keypair();
    let (client_pub, client_sec) = gen_keypair();

    let mut rep = RepSocket::builder()
        .curve_server(server_pub, server_sec)
        .build();
    let bound = rep.bind(endpoint).await.expect("rep bind failed");
    let bound = bound.to_string();

    let mut req = ReqSocket::builder()
        .curve_client(server_pub, client_pub, client_sec)
        .build();
    req.connect(&bound).await.expect("req connect failed");

    tokio::time::sleep(Duration::from_millis(100)).await;
    (req, rep)
}

async fn setup_plain(endpoint: &str) -> (ReqSocket, RepSocket) {
    let mut rep = RepSocket::builder().plain_server().build();
    let bound = rep.bind(endpoint).await.expect("rep bind failed");
    let bound = bound.to_string();

    let mut req = ReqSocket::builder().plain_client("user", "pass").build();
    req.connect(&bound).await.expect("req connect failed");

    tokio::time::sleep(Duration::from_millis(50)).await;
    (req, rep)
}

const MSG_SIZES: &[usize] = &[16, 256, 4096, 65536];

fn bench_security(c: &mut Criterion) {
    let rt = build_rt();

    // ── CURVE ────────────────────────────────────────────────────────────────
    {
        let mut group = c.benchmark_group("security/curve/req_rep/tcp");
        group.sample_size(30);
        group.measurement_time(Duration::from_secs(10));
        group.warm_up_time(Duration::from_secs(2));

        for &msg_size in MSG_SIZES {
            group.throughput(Throughput::Bytes(msg_size as u64));
            group.bench_with_input(
                BenchmarkId::from_parameter(msg_size),
                &msg_size,
                |b, &msg_size| {
                    let (mut req, mut rep) = rt.block_on(setup_curve("tcp://127.0.0.1:0"));
                    let request: Vec<u8> = vec![0xCDu8; msg_size];
                    let reply: Vec<u8> = vec![0xEFu8; msg_size];

                    b.iter(|| {
                        rt.block_on(async {
                            req.send(ZmqMessage::from(request.clone()))
                                .await
                                .expect("req send");
                            let got_req = rep.recv().await.expect("rep recv");
                            black_box(got_req);
                            rep.send(ZmqMessage::from(reply.clone()))
                                .await
                                .expect("rep send");
                            let got_rep = req.recv().await.expect("req recv");
                            black_box(got_rep);
                        });
                    });
                },
            );
        }
        group.finish();
    }

    // ── PLAIN ────────────────────────────────────────────────────────────────
    {
        let mut group = c.benchmark_group("security/plain/req_rep/tcp");
        group.sample_size(30);
        group.measurement_time(Duration::from_secs(10));
        group.warm_up_time(Duration::from_secs(2));

        for &msg_size in MSG_SIZES {
            group.throughput(Throughput::Bytes(msg_size as u64));
            group.bench_with_input(
                BenchmarkId::from_parameter(msg_size),
                &msg_size,
                |b, &msg_size| {
                    let (mut req, mut rep) = rt.block_on(setup_plain("tcp://127.0.0.1:0"));
                    let request: Vec<u8> = vec![0xCDu8; msg_size];
                    let reply: Vec<u8> = vec![0xEFu8; msg_size];

                    b.iter(|| {
                        rt.block_on(async {
                            req.send(ZmqMessage::from(request.clone()))
                                .await
                                .expect("req send");
                            let got_req = rep.recv().await.expect("rep recv");
                            black_box(got_req);
                            rep.send(ZmqMessage::from(reply.clone()))
                                .await
                                .expect("rep send");
                            let got_rep = req.recv().await.expect("req recv");
                            black_box(got_rep);
                        });
                    });
                },
            );
        }
        group.finish();
    }
}

criterion_group!(benches, bench_security);
criterion_main!(benches);