use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datum::{
Flow, OverflowStrategy, RestartFlow, RestartSettings, RestartSource, RetryFlow, Sink, Source,
StreamCompletion, StreamError, Supervision,
};
use std::{
hint::black_box,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
time::Duration,
};
fn wait<T>(completion: StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn source_materialization(c: &mut Criterion) {
let mut group = c.benchmark_group("source_materialization");
group.bench_function("empty_ignore", |b| {
b.iter(|| {
let materialized = wait(Source::<u64>::empty().run_with(Sink::ignore()).unwrap());
black_box(materialized)
});
});
group.bench_function("single_ignore", |b| {
b.iter(|| {
let materialized = wait(Source::single(1_u64).run_with(Sink::ignore()).unwrap());
black_box(materialized)
});
});
group.bench_function("repeat_take_1k_ignore", |b| {
b.iter(|| {
let materialized = Source::repeat(1_u64)
.take(1_000)
.run_with(Sink::ignore())
.unwrap();
let materialized = wait(materialized);
black_box(materialized)
});
});
group.bench_function("from_iter_collect_1k", |b| {
b.iter(|| {
let values = Source::from_iter(0_u64..1_000)
.run_with(Sink::collect())
.unwrap();
let values = wait(values);
black_box(values.len())
});
});
group.finish();
}
fn linear_map_chains(c: &mut Criterion) {
let mut group = c.benchmark_group("linear_map_chain_100k");
group.bench_function("one_map", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("five_maps", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("ten_maps", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.finish();
}
fn linear_async_boundaries(c: &mut Criterion) {
let mut group = c.benchmark_group("linear_async_boundary_100k");
group.bench_function("fused_two_maps", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("source_async_boundary", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.async_boundary_with_buffer(16)
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("flow_async_boundary", |b| {
b.iter(|| {
let flow = Flow::identity()
.map(|item: u64| item.wrapping_add(1))
.async_boundary_with_buffer(16)
.map(|item| item.wrapping_add(1));
let sum = Source::from_iter(0_u64..100_000)
.via(flow)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.finish();
}
fn grouped_and_scan(c: &mut Criterion) {
let mut group = c.benchmark_group("windowing_aggregation_10k");
group.bench_function("grouped_scan", |b| {
b.iter(|| {
let values = Source::from_iter(0_u64..10_000)
.grouped(32)
.map(|group| group.into_iter().sum::<u64>())
.scan(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::last())
.unwrap();
let values = wait(values);
black_box(values)
});
});
group.bench_function("sliding", |b| {
b.iter(|| {
let values = Source::from_iter(0_u64..10_000)
.sliding(32, 16)
.run_with(Sink::fold(0_usize, |acc, window: Vec<u64>| {
acc + window.len()
}))
.unwrap();
let values = wait(values);
black_box(values)
});
});
group.bench_function("scan", |b| {
b.iter(|| {
let values = Source::from_iter(0_u64..10_000)
.scan(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::last())
.unwrap();
let values = wait(values);
black_box(values)
});
});
group.bench_function("fold", |b| {
b.iter(|| {
let values = Source::from_iter(0_u64..10_000)
.fold(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::head())
.unwrap();
let values = wait(values);
black_box(values)
});
});
group.finish();
}
fn zip_family(c: &mut Criterion) {
let mut group = c.benchmark_group("zip_family_10k");
group.bench_function("zip_with_index", |b| {
b.iter(|| {
let last = Source::from_iter(0_u64..10_000)
.zip_with_index()
.run_with(Sink::last())
.unwrap();
let last = wait(last);
black_box(last)
});
});
group.finish();
}
fn futures_resource_surface(c: &mut Criterion) {
let mut group = c.benchmark_group("wp6b_futures_resource_10k");
group.bench_function("unfold_resource", |b| {
b.iter(|| {
let sum = Source::unfold_resource(
|| Ok(0_u64),
|next| {
if *next == 10_000 {
Ok(None)
} else {
let item = *next;
*next += 1;
Ok(Some(item))
}
},
|_| Ok(()),
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("fold_async", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.fold_async(0_u64, |acc, item| async move { Ok(acc.wrapping_add(item)) })
.run_with(Sink::head())
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.finish();
}
fn context_with_context_vs_pairs(c: &mut Criterion) {
let mut group = c.benchmark_group("context_with_context_10k");
group.bench_function("source_with_context_map_filter_map_concat_10k", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.as_source_with_context(|item| item + 1)
.map(|item| item + 1)
.filter(|item| item % 2 == 0)
.map_concat(|item| vec![(item, item + 1), (item + 1, item + 1)])
.as_source()
.run_with(Sink::fold(0_u64, |acc, item: ((u64, u64), u64)| {
acc.wrapping_add(item.0.0)
}))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.bench_function("source_pair_map_filter_map_concat_10k", |b| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.map(|item| (item, item + 1))
.map(|(item, ctx)| (item + 1, ctx))
.filter(|(item, _)| item % 2 == 0)
.map_concat(|(item, ctx)| vec![(item, ctx), (item + 1, ctx)])
.run_with(Sink::fold(0_u64, |acc, item: (u64, u64)| {
acc.wrapping_add(item.0)
}))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
});
group.finish();
}
fn cpu_bound_work(item: u64) -> u64 {
let mut state = item.wrapping_add(1);
for _ in 0..20_000 {
state = state
.wrapping_mul(1_664_525)
.wrapping_add(1_013_904_223)
.rotate_left(7);
black_box(state);
}
state
}
fn async_and_error_ops(c: &mut Criterion) {
let mut group = c.benchmark_group("async_10k");
for parallelism in [1_usize, 4, 32] {
group.bench_with_input(
BenchmarkId::new("ordered", parallelism),
¶llelism,
|b, ¶llelism| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.map_async(parallelism, |item| async move { Ok(item.wrapping_add(1)) })
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
},
);
group.bench_with_input(
BenchmarkId::new("unordered", parallelism),
¶llelism,
|b, ¶llelism| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.map_async_unordered(
parallelism,
|item| async move { Ok(item.wrapping_add(1)) },
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
},
);
group.bench_with_input(
BenchmarkId::new("partitioned", parallelism),
¶llelism,
|b, ¶llelism| {
b.iter(|| {
let sum = Source::from_iter(0_u64..10_000)
.map_async_partitioned(
parallelism,
1,
|item| item % 16,
|item| async move { Ok(item.wrapping_add(1)) },
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
},
);
}
group.finish();
let mut cpu_group = c.benchmark_group("async_cpu_bound_512");
for parallelism in [1_usize, 4, 32] {
cpu_group.bench_with_input(
BenchmarkId::new("ordered", parallelism),
¶llelism,
|b, ¶llelism| {
b.iter(|| {
let sum = Source::from_iter(0_u64..512)
.map_async(parallelism, |item| async move {
tokio::task::yield_now().await;
Ok(cpu_bound_work(item))
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let sum = wait(sum);
black_box(sum)
});
},
);
}
cpu_group.finish();
c.bench_function("cancelled_repeat", |b| {
b.iter(|| {
let materialized = Source::repeat(1_u64)
.run_with(Sink::cancelled())
.expect("cancelled sink materializes");
black_box(materialized)
});
});
c.bench_function("error_recover_1k", |b| {
b.iter(|| {
let values = Source::<u64>::failed(StreamError::Failed("bench".into()))
.recover_with_retries(1, |_| Some(Source::from_iter(0_u64..1_000)))
.map_error(|error| StreamError::Failed(format!("mapped: {error}")))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
let values = wait(values);
black_box(values)
});
});
}
fn buffers_rate(c: &mut Criterion) {
let mut group = c.benchmark_group("buffers_rate");
for strategy in [
OverflowStrategy::DropHead,
OverflowStrategy::DropTail,
OverflowStrategy::DropBuffer,
OverflowStrategy::DropNew,
OverflowStrategy::Backpressure,
OverflowStrategy::Fail,
] {
group.bench_with_input(
BenchmarkId::new(
"buffer_fast_producer_slow_consumer",
format!("{strategy:?}"),
),
&strategy,
|b, &strategy| {
b.iter(|| match strategy {
OverflowStrategy::Fail => {
let completion = Source::from_iter(0_u64..256)
.buffer(16, strategy)
.run_with(Sink::foreach(|_| thread::sleep(Duration::from_micros(10))))
.unwrap();
black_box(completion.wait().is_err())
}
_ => {
let completion = Source::from_iter(0_u64..256)
.buffer(16, strategy)
.run_with(Sink::foreach(|_| thread::sleep(Duration::from_micros(10))))
.unwrap();
let _ = wait(completion);
black_box(true)
}
});
},
);
}
group.finish();
}
fn rate_ops(c: &mut Criterion) {
let mut group = c.benchmark_group("rate_ops");
group.bench_function("conflate_fast_producer_slow_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.conflate(|left, right| left.wrapping_add(right))
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("batch_fast_producer_slow_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.batch(16, |item| item, |left, right| left.wrapping_add(right))
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("batch_weighted_fast_producer_slow_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.batch_weighted(
64,
|_| 4,
|item| item,
|left, right| left.wrapping_add(right),
)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("aggregate_with_boundary_fast_producer_slow_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.aggregate_with_boundary(
|| (0_u64, 0_usize),
|(sum, count), item| {
let next = (sum.wrapping_add(item), count + 1);
(next, next.1 >= 16)
},
|(sum, _)| sum,
None,
)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("detach_fast_producer_slow_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.detach()
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("expand_slow_producer_fast_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.expand(std::iter::repeat)
.take(512)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("extrapolate_slow_producer_fast_consumer", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..256)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.extrapolate(std::iter::repeat, None)
.take(512)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.finish();
}
fn supervision_restart(c: &mut Criterion) {
let mut group = c.benchmark_group("supervision_restart");
group.bench_function("supervision_map_resume_100k_no_error", |b| {
b.iter(|| {
let completion = Source::from_iter(0_u64..100_000)
.map_result_with_supervision(
|item| Ok(item.wrapping_add(1)),
Supervision::resuming_decider(),
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("restart_source_backoff_accuracy_3x2ms", |b| {
b.iter(|| {
let attempts = Arc::new(AtomicUsize::new(0));
let settings =
RestartSettings::new(Duration::from_millis(2), Duration::from_millis(2), 0.0)
.with_max_restarts(3, Duration::from_secs(1));
let completion = RestartSource::on_failures_with_backoff(settings, {
let attempts = Arc::clone(&attempts);
move || {
if attempts.fetch_add(1, Ordering::SeqCst) < 3 {
Source::failed(StreamError::Failed("restart".into()))
} else {
Source::single(1_u64)
}
}
})
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("restart_source_completion_cap_3", |b| {
b.iter(|| {
let attempts = Arc::new(AtomicUsize::new(0));
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(3, Duration::from_secs(1));
let completion = RestartSource::with_backoff(settings, {
let attempts = Arc::clone(&attempts);
move || Source::single(attempts.fetch_add(1, Ordering::SeqCst) as u64)
})
.run_with(Sink::fold(0_u64, |count, _| count + 1))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("restart_flow_drop_in_flight_1", |b| {
b.iter(|| {
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(1, Duration::from_secs(1));
let completion = Source::from_iter(1_u64..=5)
.via(RestartFlow::on_failures_with_backoff(settings, || {
Flow::identity().map_result(|item| {
if item == 3 {
Err(StreamError::Failed("drop".into()))
} else {
Ok(item)
}
})
}))
.run_with(Sink::fold(0_u64, |count, _| count + 1))
.unwrap();
black_box(wait(completion))
});
});
group.bench_function("retry_flow_with_backoff", |b| {
b.iter(|| {
let completion = Source::from_iter([5_u64, 1_u64])
.via(RetryFlow::with_backoff(
Duration::ZERO,
Duration::ZERO,
0.0,
3,
Flow::identity().map(|item| item / 2),
|_, output| (*output > 0).then_some(*output),
))
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.unwrap();
black_box(wait(completion))
});
});
group.finish();
}
criterion_group!(
benches,
source_materialization,
linear_map_chains,
linear_async_boundaries,
context_with_context_vs_pairs,
grouped_and_scan,
zip_family,
futures_resource_surface,
async_and_error_ops,
buffers_rate,
rate_ops,
supervision_restart
);
criterion_main!(benches);