use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use datum::{
AsyncBoundary, AsyncBoundaryExecutionConfig, Balance, BidiFlow, Broadcast, Buffer, Concat,
Flow, FusedExecutionConfig, GraphBlueprint, GraphDsl, GraphFlowShape, Identity, Interleave,
MapStage, Merge, MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted,
OverflowStrategy, Partition, Sink, Source, TakeWhile, Unzip, UnzipWith, Zip,
};
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
type PairMergeLatestGraph = GraphBlueprint<GraphFlowShape<(u64, u64), Vec<u64>>>;
fn main() {
let scenarios = [
Scenario {
name: "fused_identity_10_100k",
iterations: 25,
run: fused_identity_10_100k,
},
Scenario {
name: "fused_map_10_100k",
iterations: 20,
run: fused_map_10_100k,
},
Scenario {
name: "fused_identity_10_100k_count_sink",
iterations: 100,
run: fused_identity_10_100k_count_sink,
},
Scenario {
name: "fused_map_10_100k_fold_sink",
iterations: 80,
run: fused_map_10_100k_fold_sink,
},
Scenario {
name: "fused_identity_10_100k_typed_count_sink",
iterations: 1_000,
run: fused_identity_10_100k_typed_count_sink,
},
Scenario {
name: "fused_map_10_100k_typed_fold_sink",
iterations: 1_000,
run: fused_map_10_100k_typed_fold_sink,
},
Scenario {
name: "fused_identity_10_100k_typed_count_sink_named_attr",
iterations: 1_000,
run: fused_identity_10_100k_typed_count_sink_named_attr,
},
Scenario {
name: "fused_identity_10_100k_typed_count_sink_buffer_attr",
iterations: 1_000,
run: fused_identity_10_100k_typed_count_sink_buffer_attr,
},
Scenario {
name: "async_boundary_1_10k_count_sink",
iterations: 100,
run: async_boundary_1_10k_count_sink,
},
Scenario {
name: "async_boundary_9_10k_count_sink",
iterations: 100,
run: async_boundary_9_10k_count_sink,
},
Scenario {
name: "graph_build_identity_1",
iterations: 200_000,
run: graph_build_identity_1,
},
Scenario {
name: "graph_build_identity_10",
iterations: 80_000,
run: graph_build_identity_10,
},
Scenario {
name: "graph_build_identity_100",
iterations: 10_000,
run: graph_build_identity_100,
},
Scenario {
name: "graph_build_junction_1",
iterations: 120_000,
run: graph_build_junction_1,
},
Scenario {
name: "graph_build_junction_10",
iterations: 30_000,
run: graph_build_junction_10,
},
Scenario {
name: "graph_build_junction_50",
iterations: 5_000,
run: graph_build_junction_50,
},
Scenario {
name: "junction_broadcast_zip_10k",
iterations: 500,
run: junction_broadcast_zip_10k,
},
Scenario {
name: "junction_balance_merge_10k",
iterations: 1_000,
run: junction_balance_merge_10k,
},
Scenario {
name: "junction_prioritized_merge_10k",
iterations: 1_000,
run: junction_prioritized_merge_10k,
},
Scenario {
name: "junction_merge_preferred_10k",
iterations: 1_000,
run: junction_merge_preferred_10k,
},
Scenario {
name: "junction_concat_10k",
iterations: 1_000,
run: junction_concat_10k,
},
Scenario {
name: "junction_interleave_10k",
iterations: 1_000,
run: junction_interleave_10k,
},
Scenario {
name: "junction_merge_sorted_10k",
iterations: 1_000,
run: junction_merge_sorted_10k,
},
Scenario {
name: "junction_merge_sequence_10k",
iterations: 500,
run: junction_merge_sequence_10k,
},
Scenario {
name: "junction_merge_latest_10k",
iterations: 1_000,
run: junction_merge_latest_10k,
},
Scenario {
name: "junction_partition_10k",
iterations: 1_000,
run: junction_partition_10k,
},
Scenario {
name: "junction_unzip_with_10k",
iterations: 1_000,
run: junction_unzip_with_10k,
},
Scenario {
name: "cycle_merge_preferred_feedback_10k",
iterations: 500,
run: cycle_merge_preferred_feedback_10k,
},
Scenario {
name: "custom_identity_stage_10k",
iterations: 1_000,
run: custom_identity_stage_10k,
},
Scenario {
name: "custom_map_stage_10k",
iterations: 1_000,
run: custom_map_stage_10k,
},
Scenario {
name: "bidi_join_10k",
iterations: 500,
run: bidi_join_10k,
},
Scenario {
name: "bidi_atop_10k",
iterations: 500,
run: bidi_atop_10k,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..5 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..stages {
let next = builder.add(Identity::<u64>::new());
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn map_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: u64| item.wrapping_add(1)));
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..stages {
let next = builder.add(MapStage::new(|item: u64| item.wrapping_add(1)));
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn named_identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
identity_chain(stages).named("bench.identity")
}
fn buffered_identity_chain(stages: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
identity_chain(stages).add_attributes(datum::Attributes::input_buffer(16, 16))
}
fn identity_chain_with_async_boundaries(
stages: usize,
boundary_positions: &[usize],
) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(stages > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for stage_index in 1..stages {
if boundary_positions.contains(&stage_index) {
let boundary = builder.add(AsyncBoundary::<u64>::new());
builder.connect(outlet, boundary.inlet())?;
outlet = boundary.outlet();
}
let next = builder.add(Identity::<u64>::new());
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn junction_chain(pairs: usize) -> GraphBlueprint<GraphFlowShape<u64, u64>> {
assert!(pairs > 0);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<u64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 0..pairs {
let balance = builder.add(Balance::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(outlet, balance.inlet())?;
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
outlet = merge.outlet();
}
Ok(GraphFlowShape::new(inlet, outlet))
})
.unwrap()
}
fn broadcast_zip_graph() -> GraphBlueprint<GraphFlowShape<u64, (u64, u64)>> {
GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<u64>::new(2));
let zip = builder.add(Zip::<u64, u64>::new());
builder.connect(broadcast.outlet(0)?, zip.in0())?;
builder.connect(broadcast.outlet(1)?, zip.in1())?;
Ok(GraphFlowShape::new(broadcast.inlet(), zip.outlet()))
})
.unwrap()
}
fn balance_merge_graph() -> GraphBlueprint<GraphFlowShape<u64, u64>> {
GraphDsl::try_create(|builder| {
let balance = builder.add(Balance::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(balance.inlet(), merge.outlet()))
})
.unwrap()
}
fn identity_chain_10() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| identity_chain(10))
}
fn map_chain_10() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| map_chain(10))
}
fn named_identity_chain_10() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| named_identity_chain(10))
}
fn buffered_identity_chain_10() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| buffered_identity_chain(10))
}
fn async_boundary_1() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| identity_chain_with_async_boundaries(10, &[5]))
}
fn async_boundary_9() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
let boundary_positions = (1_usize..10).collect::<Vec<_>>();
identity_chain_with_async_boundaries(10, &boundary_positions)
})
}
fn broadcast_zip() -> &'static GraphBlueprint<GraphFlowShape<u64, (u64, u64)>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, (u64, u64)>>> = OnceLock::new();
GRAPH.get_or_init(broadcast_zip_graph)
}
fn balance_merge() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(balance_merge_graph)
}
fn prioritized_merge() -> &'static GraphBlueprint<datum::FanInShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<datum::FanInShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::create(|builder| builder.add(MergePrioritized::<u64>::new(vec![4, 1]))).unwrap()
})
}
fn merge_preferred() -> &'static GraphBlueprint<datum::MergePreferredShape<u64>> {
static GRAPH: OnceLock<GraphBlueprint<datum::MergePreferredShape<u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::create(|builder| builder.add(MergePreferred::<u64>::new(2))).unwrap()
})
}
fn concat() -> &'static GraphBlueprint<datum::FanInShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<datum::FanInShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| GraphDsl::create(|builder| builder.add(Concat::<u64>::new(2))).unwrap())
}
fn interleave() -> &'static GraphBlueprint<datum::FanInShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<datum::FanInShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::create(|builder| builder.add(Interleave::<u64>::new(2, 2))).unwrap()
})
}
fn merge_sorted() -> &'static GraphBlueprint<GraphFlowShape<(u64, u64), u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<(u64, u64), u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSorted::<u64>::new());
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
})
}
fn merge_sequence() -> &'static GraphBlueprint<GraphFlowShape<(u64, u64), u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<(u64, u64), u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
})
}
fn merge_latest() -> &'static PairMergeLatestGraph {
static GRAPH: OnceLock<PairMergeLatestGraph> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeLatest::<u64>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(GraphFlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
})
}
fn partition() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let partition = builder.add(Partition::<u64>::new(2, |item| (*item % 2) as usize));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(partition.inlet(), merge.outlet()))
})
.unwrap()
})
}
fn unzip_with() -> &'static GraphBlueprint<GraphFlowShape<u64, (u64, u64)>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, (u64, u64)>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let unzip = builder.add(UnzipWith::<u64, u64, u64>::new(|item| (item, item + 1)));
let zip = builder.add(Zip::<u64, u64>::new());
builder.connect(unzip.out0(), zip.in0())?;
builder.connect(unzip.out1(), zip.in1())?;
Ok(GraphFlowShape::new(unzip.inlet(), zip.outlet()))
})
.unwrap()
})
}
fn cyclic_merge_preferred() -> &'static GraphBlueprint<GraphFlowShape<u64, u64>> {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH.get_or_init(|| {
GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<u64>::new(1));
let broadcast = builder.add(Broadcast::<u64>::new(2));
let buffer = builder.add(Buffer::<u64>::new(16, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: u64| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(GraphFlowShape::new(
merge.secondary(0)?,
broadcast.outlet(0)?,
))
})
.unwrap()
})
}
fn fused_identity_10_100k() -> u64 {
identity_chain_10()
.run_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.output
.len() as u64
}
fn fused_map_10_100k() -> u64 {
map_chain_10()
.run_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.output
.last()
.copied()
.unwrap_or_default()
}
fn fused_identity_10_100k_count_sink() -> u64 {
identity_chain_10()
.run_count_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result as u64
}
fn fused_map_10_100k_fold_sink() -> u64 {
map_chain_10()
.run_fold_with_input_report(
0_u64..100_000,
0_u64,
|acc, item| acc.wrapping_add(item),
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result
}
fn fused_identity_10_100k_typed_count_sink() -> u64 {
identity_chain_10()
.run_typed_linear_count_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result as u64
}
fn fused_map_10_100k_typed_fold_sink() -> u64 {
map_chain_10()
.run_typed_linear_fold_with_input_report(
0_u64..100_000,
0_u64,
|acc, item| acc.wrapping_add(item),
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result
}
fn fused_identity_10_100k_typed_count_sink_named_attr() -> u64 {
named_identity_chain_10()
.run_typed_linear_count_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result as u64
}
fn fused_identity_10_100k_typed_count_sink_buffer_attr() -> u64 {
buffered_identity_chain_10()
.run_typed_linear_count_with_input_report(
0_u64..100_000,
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.result as u64
}
fn async_boundary_1_10k_count_sink() -> u64 {
let ractor_boundary_config = AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig {
event_limit: 5_000_000,
},
buffer_size: 16,
};
let report = async_boundary_1()
.run_async_boundary_count_with_input_report(0_u64..10_000, ractor_boundary_config)
.unwrap();
report.result as u64 + report.async_boundary_crossings as u64
}
fn async_boundary_9_10k_count_sink() -> u64 {
let ractor_boundary_config = AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig {
event_limit: 5_000_000,
},
buffer_size: 16,
};
let report = async_boundary_9()
.run_async_boundary_count_with_input_report(0_u64..10_000, ractor_boundary_config)
.unwrap();
report.result as u64 + report.async_boundary_crossings as u64
}
fn graph_build_identity_1() -> u64 {
identity_chain(1).stage_count() as u64
}
fn graph_build_identity_10() -> u64 {
identity_chain(10).stage_count() as u64
}
fn graph_build_identity_100() -> u64 {
identity_chain(100).stage_count() as u64
}
fn graph_build_junction_1() -> u64 {
junction_chain(1).stage_count() as u64
}
fn graph_build_junction_10() -> u64 {
junction_chain(10).stage_count() as u64
}
fn graph_build_junction_50() -> u64 {
junction_chain(50).stage_count() as u64
}
fn junction_broadcast_zip_10k() -> u64 {
broadcast_zip().run_with_input(0_u64..10_000).unwrap().len() as u64
}
fn junction_balance_merge_10k() -> u64 {
balance_merge().run_with_input(0_u64..10_000).unwrap().len() as u64
}
fn junction_prioritized_merge_10k() -> u64 {
let high = (0_u64..8_000).collect::<Vec<_>>();
let low = (10_000_u64..12_000).collect::<Vec<_>>();
prioritized_merge()
.run_fan_in(vec![high, low])
.unwrap()
.len() as u64
}
fn junction_merge_preferred_10k() -> u64 {
let preferred = (0_u64..8_000).collect::<Vec<_>>();
let secondary_one = (10_000_u64..11_000).collect::<Vec<_>>();
let secondary_two = (20_000_u64..21_000).collect::<Vec<_>>();
merge_preferred()
.run_merge_preferred(preferred, vec![secondary_one, secondary_two])
.unwrap()
.len() as u64
}
fn junction_concat_10k() -> u64 {
concat()
.run_concat(vec![
(0_u64..5_000).collect(),
(5_000_u64..10_000).collect(),
])
.unwrap()
.len() as u64
}
fn junction_interleave_10k() -> u64 {
interleave()
.run_interleave(
vec![(0_u64..5_000).collect(), (5_000_u64..10_000).collect()],
2,
false,
)
.unwrap()
.len() as u64
}
fn junction_merge_sorted_10k() -> u64 {
merge_sorted()
.run_with_input((0_u64..10_000).step_by(2).map(|item| (item, item + 1)))
.unwrap()
.len() as u64
}
fn junction_merge_sequence_10k() -> u64 {
merge_sequence()
.run_with_input((0_u64..20_000).step_by(2).map(|item| (item, item + 1)))
.unwrap()
.len() as u64
}
fn junction_merge_latest_10k() -> u64 {
merge_latest()
.run_with_input((0_u64..10_000).map(|item| (item, item + 10_000)))
.unwrap()
.len() as u64
}
fn junction_partition_10k() -> u64 {
partition().run_with_input(0_u64..10_000).unwrap().len() as u64
}
fn junction_unzip_with_10k() -> u64 {
unzip_with().run_with_input(0_u64..10_000).unwrap().len() as u64
}
fn cycle_merge_preferred_feedback_10k() -> u64 {
cyclic_merge_preferred()
.run_with_input_report(
[10_000_u64],
FusedExecutionConfig {
event_limit: 5_000_000,
},
)
.unwrap()
.output
.len() as u64
}
fn custom_identity_stage_10k() -> u64 {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH
.get_or_init(|| identity_chain(1))
.run_with_input(0_u64..10_000)
.unwrap()
.len() as u64
}
fn custom_map_stage_10k() -> u64 {
static GRAPH: OnceLock<GraphBlueprint<GraphFlowShape<u64, u64>>> = OnceLock::new();
GRAPH
.get_or_init(|| map_chain(1))
.run_with_input(0_u64..10_000)
.unwrap()
.len() as u64
}
fn bidi_join_10k() -> u64 {
let flow = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item + 1),
Flow::identity().map(|item: u64| item * 2),
)
.join(Flow::identity().map(|item: u64| item - 1));
Source::from_iter(0_u64..10_000)
.via(flow)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn bidi_atop_10k() -> u64 {
let first = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item + 1),
Flow::identity().map(|item: u64| item * 2),
);
let second = BidiFlow::from_flows(
Flow::identity().map(|item: u64| item * 3),
Flow::identity().map(|item: u64| item - 4),
);
Source::from_iter(0_u64..10_000)
.via(first.atop(second).join(Flow::identity()))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}