use criterion::{Criterion, criterion_group, criterion_main};
use datum::{
AsyncBoundary, AsyncBoundaryExecutionConfig, Balance, BidiFlow, Broadcast, Buffer, Concat,
Flow, FusedExecutionConfig, GraphBlueprint, GraphDsl, GraphFlowShape, GraphStageLogic,
Identity, Inlet, Interleave, MapStage, Merge, MergeLatest, MergePreferred, MergePrioritized,
MergeSequence, MergeSorted, Outlet, OverflowStrategy, Partition, Sink, Source, TakeWhile,
Unzip, UnzipWith, Zip,
};
use std::{
hint::black_box,
sync::atomic::{AtomicU64, Ordering},
};
type PairMergeLatestGraph = GraphBlueprint<GraphFlowShape<(u64, u64), Vec<u64>>>;
fn identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..stages {
let next = builder.add(Identity::<u64>::new());
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn map_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: u64| item.wrapping_add(1)));
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..stages {
let next = builder.add(MapStage::new(|item: u64| item.wrapping_add(1)));
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn named_identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
identity_chain(stages).named("bench.identity")
}
fn buffered_identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
identity_chain(stages).add_attributes(datum::Attributes::input_buffer(16, 16))
}
fn identity_chain_with_async_boundaries(
stages: usize,
boundary_positions: &[usize],
) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for stage_index in 1..stages {
if boundary_positions.contains(&stage_index) {
let boundary = builder.add(AsyncBoundary::<u64>::new());
builder.connect(outlet, boundary.inlet())?;
outlet = boundary.outlet();
}
let next = builder.add(Identity::<u64>::new());
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn broadcast_zip_graph() -> GraphBlueprint<GraphFlowShape<u64, (u64, u64)>> {
GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<u64>::new(2));
let zip = builder.add(Zip::<u64, u64>::new());
builder.connect(broadcast.outlet(0)?, zip.in0())?;
builder.connect(broadcast.outlet(1)?, zip.in1())?;
Ok(GraphFlowShape::new(broadcast.inlet(), zip.outlet()))
})
.unwrap()
}
fn balance_merge_graph() -> GraphBlueprint<GraphFlowShape<u64, u64>> {
GraphDsl::try_create(|builder| {
let balance = builder.add(Balance::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(balance.inlet(), merge.outlet()))
})
.unwrap()
}
fn junction_chain(pairs: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(pairs > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 0..pairs {
let balance = builder.add(Balance::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(outlet, balance.inlet())?;
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
outlet = merge.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn prioritized_merge_graph() -> GraphBlueprint<datum::FanInShape<u64, u64>> {
GraphDsl::create(|builder| builder.add(MergePrioritized::<u64>::new(vec![4, 1]))).unwrap()
}
fn merge_preferred_graph() -> GraphBlueprint<datum::MergePreferredShape<u64>> {
GraphDsl::create(|builder| builder.add(MergePreferred::<u64>::new(2))).unwrap()
}
fn concat_graph() -> GraphBlueprint<datum::FanInShape<u64, u64>> {
GraphDsl::create(|builder| builder.add(Concat::<u64>::new(2))).unwrap()
}
fn interleave_graph() -> GraphBlueprint<datum::FanInShape<u64, u64>> {
GraphDsl::create(|builder| builder.add(Interleave::<u64>::new(2, 2))).unwrap()
}
fn merge_sorted_graph() -> GraphBlueprint<GraphFlowShape<(u64, u64), u64>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSorted::<u64>::new());
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
fn merge_sequence_graph() -> GraphBlueprint<GraphFlowShape<(u64, u64), u64>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
fn merge_latest_graph() -> PairMergeLatestGraph {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeLatest::<u64>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
fn partition_graph() -> GraphBlueprint<GraphFlowShape<u64, u64>> {
GraphDsl::try_create(|builder| {
let partition = builder.add(Partition::<u64>::new(2, |item| (*item % 2) as usize));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(partition.inlet(), merge.outlet()))
})
.unwrap()
}
fn unzip_with_graph() -> GraphBlueprint<GraphFlowShape<u64, (u64, u64)>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(UnzipWith::<u64, u64, u64>::new(|item| (item, item + 1)));
let zip = builder.add(Zip::<u64, u64>::new());
builder.connect(unzip.out0(), zip.in0())?;
builder.connect(unzip.out1(), zip.in1())?;
Ok(GraphFlowShape::new(unzip.inlet(), zip.outlet()))
})
.unwrap()
}
fn cyclic_merge_preferred_graph() -> GraphBlueprint<GraphFlowShape<u64, u64>> {
GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<u64>::new(1));
let broadcast = builder.add(Broadcast::<u64>::new(2));
let buffer = builder.add(Buffer::<u64>::new(16, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: u64| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(GraphFlowShape::new(
merge.secondary(0)?,
broadcast.outlet(0)?,
))
})
.unwrap()
}
fn bidi_join_10k() -> u64 {
let flow = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item + 1),
Flow::identity().map(|item: u64| item * 2),
)
.join(Flow::identity().map(|item: u64| item - 1));
Source::from_iter(0_u64..10_000)
.via(flow)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn bidi_atop_10k() -> u64 {
let first = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item + 1),
Flow::identity().map(|item: u64| item * 2),
);
let second = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item * 3),
Flow::identity().map(|item: u64| item - 4),
);
Source::from_iter(0_u64..10_000)
.via(first.atop(second).join(Flow::identity()))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn fused_hot_path(c: &mut Criterion) {
let identity = identity_chain(10);
let map = map_chain(10);
let config = FusedExecutionConfig {
event_limit: 5_000_000,
};
let mut group = c.benchmark_group("fused_hot_path");
group.bench_function("identity_10_100k", |b| {
b.iter(|| {
let result = identity
.run_with_input_report(0_u64..100_000, config)
.unwrap();
black_box(result.output.len())
});
});
group.bench_function("map_10_100k", |b| {
b.iter(|| {
let result = map.run_with_input_report(0_u64..100_000, config).unwrap();
black_box(result.output.last().copied())
});
});
group.bench_function("identity_10_100k_count_sink", |b| {
b.iter(|| {
let result = identity
.run_count_with_input_report(0_u64..100_000, config)
.unwrap();
black_box(result.result)
});
});
group.bench_function("map_10_100k_fold_sink", |b| {
b.iter(|| {
let result = map
.run_fold_with_input_report(
0_u64..100_000,
0_u64,
|acc, item| acc.wrapping_add(item),
config,
)
.unwrap();
black_box(result.result)
});
});
group.bench_function("identity_10_100k_typed_count_sink", |b| {
b.iter(|| {
let result = identity
.run_typed_linear_count_with_input_report(0_u64..100_000, config)
.unwrap();
black_box(result.result)
});
});
group.bench_function("map_10_100k_typed_fold_sink", |b| {
b.iter(|| {
let result = map
.run_typed_linear_fold_with_input_report(
0_u64..100_000,
0_u64,
|acc, item| acc.wrapping_add(item),
config,
)
.unwrap();
black_box(result.result)
});
});
let named = named_identity_chain(10);
group.bench_function("identity_10_100k_typed_count_sink_named_attr", |b| {
b.iter(|| {
let result = named
.run_typed_linear_count_with_input_report(0_u64..100_000, config)
.unwrap();
black_box(result.result)
});
});
let buffered = buffered_identity_chain(10);
group.bench_function("identity_10_100k_typed_count_sink_buffer_attr", |b| {
b.iter(|| {
let result = buffered
.run_typed_linear_count_with_input_report(0_u64..100_000, config)
.unwrap();
black_box(result.result)
});
});
group.finish();
}
fn async_boundary_overhead(c: &mut Criterion) {
let one_boundary = identity_chain_with_async_boundaries(10, &[5]);
let many_boundary_positions = (1_usize..10).collect::<Vec<_>>();
let many_boundaries = identity_chain_with_async_boundaries(10, &many_boundary_positions);
let config = FusedExecutionConfig {
event_limit: 5_000_000,
};
let ractor_boundary_config = AsyncBoundaryExecutionConfig {
fused: config,
buffer_size: 16,
};
let mut group = c.benchmark_group("async_boundaries");
group.bench_function("identity_10_10k_boundary_1_count_sink", |b| {
b.iter(|| {
let result = one_boundary
.run_async_boundary_count_with_input_report(0_u64..10_000, ractor_boundary_config)
.unwrap();
black_box((result.result, result.async_boundary_crossings))
});
});
group.bench_function("identity_10_10k_boundary_9_count_sink", |b| {
b.iter(|| {
let result = many_boundaries
.run_async_boundary_count_with_input_report(0_u64..10_000, ractor_boundary_config)
.unwrap();
black_box((result.result, result.async_boundary_crossings))
});
});
group.finish();
}
fn graph_building(c: &mut Criterion) {
let mut group = c.benchmark_group("graph_building");
for stages in [1_usize, 10, 100] {
group.bench_function(format!("identity_chain_{stages}"), |b| {
b.iter(|| black_box(identity_chain(stages)));
});
}
for pairs in [1_usize, 10, 50] {
group.bench_function(format!("junction_chain_{pairs}"), |b| {
b.iter(|| black_box(junction_chain(pairs)));
});
}
group.finish();
}
fn junction_throughput(c: &mut Criterion) {
let broadcast_zip = broadcast_zip_graph();
let balance_merge = balance_merge_graph();
let prioritized = prioritized_merge_graph();
let preferred = merge_preferred_graph();
let concat = concat_graph();
let interleave = interleave_graph();
let merge_sorted = merge_sorted_graph();
let merge_sequence = merge_sequence_graph();
let merge_latest = merge_latest_graph();
let partition = partition_graph();
let unzip_with = unzip_with_graph();
let cyclic_merge_preferred = cyclic_merge_preferred_graph();
let mut group = c.benchmark_group("junction_throughput");
group.bench_function("broadcast_zip_10k", |b| {
b.iter(|| {
let result = broadcast_zip.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("balance_merge_10k", |b| {
b.iter(|| {
let result = balance_merge.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("prioritized_merge_10k", |b| {
let high = (0_u64..8_000).collect::<Vec<_>>();
let low = (10_000_u64..12_000).collect::<Vec<_>>();
b.iter(|| {
let result = prioritized
.run_fan_in(vec![high.clone(), low.clone()])
.unwrap();
black_box(result.len())
});
});
group.bench_function("merge_preferred_10k", |b| {
let preferred_input = (0_u64..8_000).collect::<Vec<_>>();
let secondary_one = (10_000_u64..11_000).collect::<Vec<_>>();
let secondary_two = (20_000_u64..21_000).collect::<Vec<_>>();
b.iter(|| {
let result = preferred
.run_merge_preferred(
preferred_input.clone(),
vec![secondary_one.clone(), secondary_two.clone()],
)
.unwrap();
black_box(result.len())
});
});
group.bench_function("concat_10k", |b| {
let a = (0_u64..5_000).collect::<Vec<_>>();
let b_vec = (5_000_u64..10_000).collect::<Vec<_>>();
b.iter(|| {
let result = concat.run_concat(vec![a.clone(), b_vec.clone()]).unwrap();
black_box(result.len())
});
});
group.bench_function("interleave_10k", |b| {
let a = (0_u64..5_000).collect::<Vec<_>>();
let b_vec = (5_000_u64..10_000).collect::<Vec<_>>();
b.iter(|| {
let result = interleave
.run_interleave(vec![a.clone(), b_vec.clone()], 2, false)
.unwrap();
black_box(result.len())
});
});
group.bench_function("merge_sorted_10k", |b| {
b.iter(|| {
let result = merge_sorted
.run_with_input((0_u64..10_000).step_by(2).map(|item| (item, item + 1)))
.unwrap();
black_box(result.len())
});
});
group.bench_function("merge_sequence_10k", |b| {
b.iter(|| {
let result = merge_sequence
.run_with_input((0_u64..20_000).step_by(2).map(|item| (item, item + 1)))
.unwrap();
black_box(result.len())
});
});
group.bench_function("merge_latest_10k", |b| {
b.iter(|| {
let result = merge_latest
.run_with_input((0_u64..10_000).map(|item| (item, item + 10_000)))
.unwrap();
black_box(result.len())
});
});
group.bench_function("partition_10k", |b| {
b.iter(|| {
let result = partition.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("unzip_with_10k", |b| {
b.iter(|| {
let result = unzip_with.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("cycle_merge_preferred_feedback_10k", |b| {
b.iter(|| {
let result = cyclic_merge_preferred
.run_with_input_report(
[10_000_u64],
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap();
black_box(result.output.len())
});
});
group.finish();
}
fn backpressure_latency(c: &mut Criterion) {
let shape = GraphFlowShape::new(Inlet::<u64>::new("in"), Outlet::<u64>::new("out"));
let inlet = shape.inlet();
let outlet = shape.outlet();
c.bench_function("port_pull_push_cycle_1k", |b| {
b.iter(|| {
let mut logic = GraphStageLogic::new(&shape);
for item in 0_u64..1_000 {
logic.pull(&inlet).unwrap();
logic.offer(&inlet, item).unwrap();
let grabbed = logic.grab(&inlet).unwrap();
logic.request(&outlet).unwrap();
logic.push(&outlet, grabbed).unwrap();
black_box(grabbed);
}
});
});
}
fn custom_stage_overhead(c: &mut Criterion) {
let identity = identity_chain(1);
let map = map_chain(1);
let mut group = c.benchmark_group("custom_stage_overhead");
group.bench_function("identity_stage_10k", |b| {
b.iter(|| {
let result = identity.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("map_stage_10k", |b| {
b.iter(|| {
let result = map.run_with_input(0_u64..10_000).unwrap();
black_box(result.len())
});
});
group.bench_function("emit_multiple_10k", |b| {
let shape = GraphFlowShape::new(Inlet::<u64>::new("in"), Outlet::<u64>::new("out"));
b.iter(|| {
let mut logic = GraphStageLogic::new(&shape);
let inlet = shape.inlet();
let outlet = shape.outlet();
for item in 0_u64..10_000 {
logic.pull(&inlet).unwrap();
logic.offer(&inlet, item).unwrap();
let grabbed = logic.grab(&inlet).unwrap();
logic.request(&outlet).unwrap();
logic
.emit_multiple(&outlet, std::iter::once(grabbed))
.unwrap();
}
black_box(logic.stage_error().is_none())
});
});
group.bench_function("read_n_10k", |b| {
let shape = GraphFlowShape::new(Inlet::<u64>::new("in"), Outlet::<u64>::new("out"));
b.iter(|| {
let mut logic = GraphStageLogic::new(&shape);
let inlet = shape.inlet();
let _outlet = shape.outlet();
for item in 0_u64..10_000 {
logic.pull(&inlet).unwrap();
logic.offer(&inlet, item).unwrap();
logic
.read_n(&inlet, 1, |_: Vec<u64>| {}, |_: Vec<u64>| {})
.unwrap();
}
black_box(logic.stage_error().is_none())
});
});
group.bench_function("async_callback_ingress_10k", |b| {
let shape = GraphFlowShape::new(Inlet::<u64>::new("in"), Outlet::<u64>::new("out"));
b.iter(|| {
let mut logic = GraphStageLogic::new(&shape);
let counter = AtomicU64::new(0);
let cb = logic.get_async_callback(move |_logic| {
counter.fetch_add(1, Ordering::Relaxed);
});
for _ in 0..10_000 {
cb.invoke_without_logic();
}
logic.drain_async_callbacks();
black_box(logic.stage_error().is_none())
});
});
group.finish();
}
fn bidi_composition(c: &mut Criterion) {
let mut group = c.benchmark_group("bidi_composition_10k");
group.bench_function("join", |b| b.iter(|| black_box(bidi_join_10k())));
group.bench_function("atop", |b| b.iter(|| black_box(bidi_atop_10k())));
group.finish();
}
criterion_group!(
benches,
fused_hot_path,
async_boundary_overhead,
graph_building,
junction_throughput,
backpressure_latency,
custom_stage_overhead,
bidi_composition
);
criterion_main!(benches);