use criterion::{
async_executor::AsyncExecutor, criterion_group, criterion_main, measurement::WallTime, Bencher,
BenchmarkGroup, Criterion,
};
use dialectic::prelude::*;
use dialectic_null as null;
use dialectic_tokio_mpsc as mpsc;
use futures::Future;
use std::{convert::TryInto, fmt::Debug, marker, sync::Arc, time::Instant};
use tokio::runtime::Runtime;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum Primitive {
Send,
Recv,
Choose,
Offer,
Call,
Split,
}
#[Transmitter(Tx for ())]
async fn send<Tx, Rx>(
chan: Chan<Session! { loop { send () } }, Tx, Rx>,
) -> Chan<Session! { loop { send () } }, Tx, Rx>
where
Rx: Send,
Tx::Error: Debug,
{
chan.send(()).await.unwrap()
}
#[Receiver(Rx for ())]
async fn recv<Tx, Rx>(
chan: Chan<Session! { loop { recv () } }, Tx, Rx>,
) -> Chan<Session! { loop { recv () } }, Tx, Rx>
where
Tx: Send,
Rx::Error: Debug,
{
chan.recv().await.unwrap().1
}
#[Transmitter(Tx)]
async fn choose<Tx, Rx>(
chan: Chan<Session! { loop { choose { 0 => {} } } }, Tx, Rx>,
) -> Chan<Session! { loop { choose { 0 => {} } } }, Tx, Rx>
where
Rx: Send,
Tx::Error: Debug,
{
chan.choose::<0>().await.unwrap()
}
#[Receiver(Rx)]
async fn offer<Tx, Rx>(
chan: Chan<Session! { loop { offer { 0 => {} } } }, Tx, Rx>,
) -> Chan<Session! { loop { offer { 0 => {} } } }, Tx, Rx>
where
Tx: Send,
Rx::Error: Debug,
{
offer!(in chan {
0 => chan,
})
.unwrap()
}
async fn call<Tx, Rx>(
chan: Chan<Session! { loop { call {} } }, Tx, Rx>,
) -> Chan<Session! { loop { call {} } }, Tx, Rx>
where
Tx: Send,
Rx: Send,
{
chan.call(|_| async { Ok::<_, ()>(()) })
.await
.unwrap()
.1
.unwrap()
}
async fn split<Tx, Rx>(
chan: Chan<Session! { loop { split { -> {}, <- {} } } }, Tx, Rx>,
) -> Chan<Session! { loop { split { -> {} <- {} } } }, Tx, Rx>
where
Tx: Send,
Rx: Send,
{
chan.split(|_, _| async { Ok::<_, ()>(()) })
.await
.unwrap()
.1
.unwrap()
}
fn bench_chan_loop<S, Tx, Rx, F, Fut, N, H, A>(
b: &mut Bencher,
rt: Arc<A>,
channel: N,
primitive: Primitive,
f: F,
) where
F: Fn(Chan<S, Tx, Rx>) -> Fut,
Fut: Future<Output = Chan<S, Tx, Rx>>,
N: Fn(Primitive, u64) -> (Tx, Rx, H),
S: Session,
Tx: marker::Send + 'static,
Rx: marker::Send + 'static,
A: AsyncExecutor,
{
b.iter_custom(|iters| {
let (tx, rx, drop_after_bench) = channel(primitive, iters);
let mut chan = S::wrap(tx, rx);
let elapsed = rt.block_on(async {
let start = Instant::now();
for _ in 0..iters {
chan = f(chan).await;
}
start.elapsed()
});
drop(drop_after_bench);
elapsed
});
}
fn bench_chan_loop_group<S, Tx, Rx, Fut, H, A>(
g: &mut BenchmarkGroup<WallTime>,
rt: Arc<A>,
channel: fn(Primitive, u64) -> (Tx, Rx, H),
name: &str,
primitive: Primitive,
f: fn(Chan<S, Tx, Rx>) -> Fut,
) where
Fut: Future<Output = Chan<S, Tx, Rx>>,
S: Session,
Tx: marker::Send + 'static,
Rx: marker::Send + 'static,
A: AsyncExecutor,
{
g.bench_function(name, move |b| {
bench_chan_loop(b, rt.clone(), channel, primitive, f)
});
}
#[Transmitter(Tx for ())]
#[Receiver(Rx for ())]
fn bench_all_on<Tx, Rx, H, A>(
c: &mut Criterion,
rt_name: &str,
rt: Arc<A>,
backend_name: &str,
channel: fn(Primitive, u64) -> (Tx, Rx, H),
) where
Tx::Error: Debug,
Rx::Error: Debug,
A: AsyncExecutor,
{
use Primitive::*;
let group_name = format!("{}/{}", rt_name, backend_name);
let mut g = c.benchmark_group(&group_name);
bench_chan_loop_group(&mut g, rt.clone(), channel, "send", Send, send);
bench_chan_loop_group(&mut g, rt.clone(), channel, "recv", Recv, recv);
bench_chan_loop_group(&mut g, rt.clone(), channel, "choose", Choose, choose);
bench_chan_loop_group(&mut g, rt.clone(), channel, "offer", Offer, offer);
bench_chan_loop_group(&mut g, rt.clone(), channel, "call", Call, call);
bench_chan_loop_group(&mut g, rt, channel, "split", Split, split);
g.finish();
}
fn bench_tokio_null(c: &mut Criterion) {
bench_all_on(
c,
"tokio",
Arc::new(Runtime::new().unwrap()),
"null",
|_primitive, _iters| (null::Sender::default(), null::Receiver::default(), ()),
)
}
fn bench_tokio_mpsc(c: &mut Criterion) {
use Primitive::*;
bench_all_on(
c,
"tokio",
Arc::new(Runtime::new().unwrap()),
"mpsc",
|primitive, iters| {
let (tx0, rx0) = mpsc::unbounded_channel();
let (tx1, rx1) = mpsc::unbounded_channel();
match primitive {
Send | Choose | Call | Split => {}
Recv => {
for _ in 0..iters {
tx1.0.send(Box::new(())).unwrap();
}
}
Offer => {
for _ in 0..iters {
let zero_choice: Choice<1> = 0u8.try_into().unwrap();
tx1.0.send(Box::new(zero_choice)).unwrap();
}
}
};
(tx0, rx1, (tx1, rx0))
},
)
}
criterion_group!(benches, bench_tokio_null, bench_tokio_mpsc);
criterion_main!(benches);