use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use crypto_box::{aead::OsRng, SecretKey};
use rustzmq2::{prelude::*, RepSocket, ReqSocket, ZmqMessage};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
fn build_rt() -> Runtime {
Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("tokio runtime")
}
fn gen_keypair() -> ([u8; 32], [u8; 32]) {
let sec = SecretKey::generate(&mut OsRng);
let pub_ = sec.public_key();
(*pub_.as_bytes(), sec.to_bytes())
}
async fn setup_curve(endpoint: &str) -> (ReqSocket, RepSocket) {
let (server_pub, server_sec) = gen_keypair();
let (client_pub, client_sec) = gen_keypair();
let mut rep = RepSocket::builder()
.curve_server(server_pub, server_sec)
.build();
let bound = rep.bind(endpoint).await.expect("rep bind failed");
let bound = bound.to_string();
let mut req = ReqSocket::builder()
.curve_client(server_pub, client_pub, client_sec)
.build();
req.connect(&bound).await.expect("req connect failed");
tokio::time::sleep(Duration::from_millis(100)).await;
(req, rep)
}
async fn setup_plain(endpoint: &str) -> (ReqSocket, RepSocket) {
let mut rep = RepSocket::builder().plain_server().build();
let bound = rep.bind(endpoint).await.expect("rep bind failed");
let bound = bound.to_string();
let mut req = ReqSocket::builder().plain_client("user", "pass").build();
req.connect(&bound).await.expect("req connect failed");
tokio::time::sleep(Duration::from_millis(50)).await;
(req, rep)
}
const MSG_SIZES: &[usize] = &[16, 256, 4096, 65536];
fn bench_security(c: &mut Criterion) {
let rt = build_rt();
{
let mut group = c.benchmark_group("security/curve/req_rep/tcp");
group.sample_size(30);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in MSG_SIZES {
group.throughput(Throughput::Bytes(msg_size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
let (mut req, mut rep) = rt.block_on(setup_curve("tcp://127.0.0.1:0"));
let request: Vec<u8> = vec![0xCDu8; msg_size];
let reply: Vec<u8> = vec![0xEFu8; msg_size];
b.iter(|| {
rt.block_on(async {
req.send(ZmqMessage::from(request.clone()))
.await
.expect("req send");
let got_req = rep.recv().await.expect("rep recv");
black_box(got_req);
rep.send(ZmqMessage::from(reply.clone()))
.await
.expect("rep send");
let got_rep = req.recv().await.expect("req recv");
black_box(got_rep);
});
});
},
);
}
group.finish();
}
{
let mut group = c.benchmark_group("security/plain/req_rep/tcp");
group.sample_size(30);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_secs(2));
for &msg_size in MSG_SIZES {
group.throughput(Throughput::Bytes(msg_size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(msg_size),
&msg_size,
|b, &msg_size| {
let (mut req, mut rep) = rt.block_on(setup_plain("tcp://127.0.0.1:0"));
let request: Vec<u8> = vec![0xCDu8; msg_size];
let reply: Vec<u8> = vec![0xEFu8; msg_size];
b.iter(|| {
rt.block_on(async {
req.send(ZmqMessage::from(request.clone()))
.await
.expect("req send");
let got_req = rep.recv().await.expect("rep recv");
black_box(got_req);
rep.send(ZmqMessage::from(reply.clone()))
.await
.expect("rep send");
let got_rep = req.recv().await.expect("req recv");
black_box(got_rep);
});
});
},
);
}
group.finish();
}
}
criterion_group!(benches, bench_security);
criterion_main!(benches);