use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use iroh_http_core::{
fetch, serve,
server::{respond, ServeOptions},
stream::make_body_channel,
IrohEndpoint, NetworkingOptions, NodeOptions, RequestPayload,
};
fn local_opts() -> NodeOptions {
NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
}
}
async fn make_pair() -> (IrohEndpoint, IrohEndpoint) {
let server = IrohEndpoint::bind(local_opts()).await.unwrap();
let client = IrohEndpoint::bind(local_opts()).await.unwrap();
(server, client)
}
fn direct_addrs(ep: &IrohEndpoint) -> Vec<std::net::SocketAddr> {
ep.raw().addr().ip_addrs().cloned().collect()
}
fn start_echo_server(server_ep: IrohEndpoint) {
let sep = server_ep.clone();
serve(
server_ep,
ServeOptions::default(),
move |payload: RequestPayload| {
let sep2 = sep.clone();
tokio::spawn(async move {
while sep2
.handles()
.next_chunk(payload.req_body_handle)
.await
.unwrap()
.is_some()
{}
respond(sep2.handles(), payload.req_handle, 200, vec![]).unwrap();
sep2.handles().finish_body(payload.res_body_handle).unwrap();
});
},
);
}
fn bench_connection_establishment(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
c.bench_function("connection_establishment", |b| {
b.to_async(&rt)
.iter_with_large_drop(|| async { make_pair().await });
});
}
fn bench_fetch_get_latency(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (server_ep, client_ep, server_id, server_addrs) = rt.block_on(async {
let (server_ep, client_ep) = make_pair().await;
let id = server_ep.node_id().to_string();
let a = direct_addrs(&server_ep);
(server_ep, client_ep, id, a)
});
start_echo_server(server_ep);
c.bench_function("fetch_get_latency", |b| {
b.to_async(&rt).iter(|| async {
let res = fetch(
&client_ep,
&server_id,
"/bench",
"GET",
&[],
None, None, None, Some(&server_addrs),
)
.await
.unwrap();
client_ep
.handles()
.next_chunk(res.body_handle)
.await
.unwrap();
});
});
}
fn bench_post_body_throughput(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (server_ep, client_ep, server_id, server_addrs) = rt.block_on(async {
let (server_ep, client_ep) = make_pair().await;
let id = server_ep.node_id().to_string();
let a = direct_addrs(&server_ep);
(server_ep, client_ep, id, a)
});
start_echo_server(server_ep);
let mut group = c.benchmark_group("post_body_throughput_bytes");
for size in [1_024usize, 64 * 1_024, 1_024 * 1_024] {
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &sz| {
let chunk = Bytes::from(vec![0x42u8; sz]);
let client = client_ep.clone();
let id = server_id.clone();
let addrs = server_addrs.clone();
b.to_async(&rt).iter(|| {
let chunk = chunk.clone();
let client = client.clone();
let id = id.clone();
let addrs = addrs.clone();
async move {
let (writer, reader) = client.handles().make_body_channel();
let write_handle = client.handles().insert_writer(writer).unwrap();
let client2 = client.clone();
tokio::spawn(async move {
client2
.handles()
.send_chunk(write_handle, chunk)
.await
.unwrap();
client2.handles().finish_body(write_handle).unwrap();
});
let res = fetch(
&client,
&id,
"/upload",
"POST",
&[],
Some(reader),
None, None, Some(&addrs),
)
.await
.unwrap();
client.handles().next_chunk(res.body_handle).await.unwrap();
}
});
});
}
group.finish();
}
fn bench_response_body_streaming(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (server_ep, client_ep, server_id, server_addrs) = rt.block_on(async {
let (server_ep, client_ep) = make_pair().await;
let id = server_ep.node_id().to_string();
let a = direct_addrs(&server_ep);
(server_ep, client_ep, id, a)
});
let sep = server_ep.clone();
serve(
server_ep,
ServeOptions::default(),
move |payload: RequestPayload| {
let sep2 = sep.clone();
tokio::spawn(async move {
let n: usize = payload
.url
.rsplit('/')
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
while sep2
.handles()
.next_chunk(payload.req_body_handle)
.await
.unwrap()
.is_some()
{}
respond(sep2.handles(), payload.req_handle, 200, vec![]).unwrap();
if n > 0 {
sep2.handles()
.send_chunk(payload.res_body_handle, Bytes::from(vec![0u8; n]))
.await
.unwrap();
}
sep2.handles().finish_body(payload.res_body_handle).unwrap();
});
},
);
let mut group = c.benchmark_group("response_body_streaming_bytes");
for size in [1_024usize, 64 * 1_024, 1_024 * 1_024] {
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &sz| {
let client = client_ep.clone();
let id = server_id.clone();
let addrs = server_addrs.clone();
b.to_async(&rt).iter(|| {
let client = client.clone();
let id = id.clone();
let addrs = addrs.clone();
async move {
let url = format!("/bench/{sz}");
let res = fetch(
&client,
&id,
&url,
"GET",
&[],
None,
None,
None,
Some(&addrs),
)
.await
.unwrap();
while client
.handles()
.next_chunk(res.body_handle)
.await
.unwrap()
.is_some()
{}
}
});
});
}
group.finish();
}
fn bench_handle_allocation(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let ep = rt.block_on(IrohEndpoint::bind(local_opts())).unwrap();
let mut group = c.benchmark_group("handle_ops");
group.bench_function("alloc_body_writer", |b| {
b.iter(|| {
let (h, _reader) = ep.handles().alloc_body_writer().unwrap();
ep.handles().cancel_reader(h);
});
});
group.bench_function("make_body_channel", |b| {
b.iter(|| {
let (_writer, _reader) = make_body_channel();
});
});
group.finish();
}
criterion_group!(
benches,
bench_connection_establishment,
bench_fetch_get_latency,
bench_post_body_throughput,
bench_response_body_streaming,
bench_handle_allocation,
);
criterion_main!(benches);