use criterion::{Criterion, criterion_group, criterion_main};
use datum::{Keep, Sink, Source, StreamCompletion};
use std::{
hint::black_box,
sync::atomic::{AtomicU64, Ordering},
};
fn wait<T>(completion: StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn source_chain(stages: usize, len: u64) -> Source<u64> {
let mut source = Source::from_fn_iter(move || 0..len);
for _ in 0..stages {
source = source.map(|item| item.wrapping_add(1));
}
source
}
fn graph_construction(c: &mut Criterion) {
let mut group = c.benchmark_group("graph_construction");
for stages in [1_usize, 10, 100, 1_000] {
group.bench_function(format!("{stages}_stages"), |b| {
b.iter(|| {
let graph = source_chain(stages, 1).to(Sink::ignore());
black_box(graph)
});
});
}
group.finish();
}
fn materialization_latency(c: &mut Criterion) {
let mut group = c.benchmark_group("materialization_latency");
for stages in [1_usize, 10, 100] {
let graph = source_chain(stages, 1).to(Sink::ignore());
group.bench_function(format!("{stages}_operators"), |b| {
b.iter(|| black_box(graph.run().unwrap()));
});
}
group.finish();
}
fn materialized_value_overhead(c: &mut Criterion) {
let mut group = c.benchmark_group("materialized_value_overhead");
group.bench_function("keep_left", |b| {
b.iter(|| {
let materialized = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::left)
.run()
.unwrap();
black_box(materialized)
});
});
group.bench_function("keep_right", |b| {
b.iter(|| {
let materialized = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(
Sink::ignore().map_materialized_value(|_| 2_u64),
Keep::right,
)
.run()
.unwrap();
black_box(materialized)
});
});
group.bench_function("keep_both", |b| {
b.iter(|| {
let materialized = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::both)
.run()
.unwrap();
black_box(materialized)
});
});
group.bench_function("keep_none", |b| {
b.iter(|| {
let materialized = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::none)
.run()
.unwrap();
black_box(materialized)
});
});
group.bench_function("custom", |b| {
b.iter(|| {
let materialized = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(
Sink::ignore().map_materialized_value(|_| 2_u64),
|left, right| left + right,
)
.run()
.unwrap();
black_box(materialized)
});
});
group.finish();
}
fn lazy_first_demand_overhead(c: &mut Criterion) {
let mut group = c.benchmark_group("lazy_first_demand_overhead");
group.bench_function("eager_source_first_demand", |b| {
b.iter(|| {
let (mat, queue) = Source::single(1_u64)
.map_materialized_value(|_| 7_u64)
.to_mat(Sink::queue(), Keep::both)
.run()
.unwrap();
let item = queue.pull().unwrap().unwrap_or_default();
black_box(item.wrapping_add(mat))
});
});
group.bench_function("lazy_source_first_demand", |b| {
b.iter(|| {
let (mat, queue) = Source::<u64>::lazy_source(|| {
Source::single(1_u64).map_materialized_value(|_| 7_u64)
})
.to_mat(Sink::queue(), Keep::both)
.run()
.unwrap();
let item = queue.pull().unwrap().unwrap_or_default();
let mat = wait(mat);
black_box(item.wrapping_add(mat))
});
});
group.finish();
}
fn sink_terminal_costs(c: &mut Criterion) {
let mut group = c.benchmark_group("sink_terminal_costs");
group.bench_function("ignore", |b| {
b.iter(|| {
let materialized = Source::from_iter(0_u64..1_000)
.run_with(Sink::ignore())
.unwrap();
let materialized = wait(materialized);
black_box(materialized)
});
});
group.bench_function("head", |b| {
b.iter(|| {
let materialized = Source::from_iter(0_u64..1_000)
.run_with(Sink::head())
.unwrap();
let materialized = wait(materialized);
black_box(materialized)
});
});
group.bench_function("fold", |b| {
b.iter(|| {
let materialized = Source::from_iter(0_u64..1_000)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let materialized = wait(materialized);
black_box(materialized)
});
});
group.bench_function("foreach", |b| {
b.iter(|| {
let sum = AtomicU64::new(0);
let materialized = Source::from_iter(0_u64..1_000)
.run_with(Sink::foreach(move |item| {
black_box(sum.fetch_add(item, Ordering::Relaxed));
}))
.unwrap();
let materialized = wait(materialized);
black_box(materialized)
});
});
group.finish();
}
fn repeated_materialization(c: &mut Criterion) {
let graph = Source::from_iter(0_u64..128)
.map(|item| item.wrapping_add(1))
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::right,
);
c.bench_function("repeated_materialization", |b| {
b.iter(|| black_box(wait(graph.run().unwrap())));
});
}
criterion_group!(
benches,
graph_construction,
materialization_latency,
materialized_value_overhead,
lazy_first_demand_overhead,
sink_terminal_costs,
repeated_materialization
);
criterion_main!(benches);