use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
thread,
time::{Duration, Instant},
};
use datum::{
Flow, OverflowStrategy, RestartFlow, RestartSettings, RestartSource, RetryFlow, Sink, Source,
StreamError, Supervision, ThrottleMode,
};
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "empty_ignore",
iterations: 10_000,
run: empty_ignore,
},
Scenario {
name: "single_ignore",
iterations: 10_000,
run: single_ignore,
},
Scenario {
name: "repeat_take_1k_ignore",
iterations: 2_000,
run: repeat_take_1k_ignore,
},
Scenario {
name: "from_iter_collect_1k",
iterations: 2_000,
run: from_iter_collect_1k,
},
Scenario {
name: "map_chain_100k_1",
iterations: 100,
run: map_chain_100k_1,
},
Scenario {
name: "map_chain_100k_5",
iterations: 60,
run: map_chain_100k_5,
},
Scenario {
name: "map_chain_100k_10",
iterations: 30,
run: map_chain_100k_10,
},
Scenario {
name: "linear_async_boundary_100k",
iterations: 20,
run: linear_async_boundary_100k,
},
Scenario {
name: "grouped_scan_10k",
iterations: 400,
run: grouped_scan_10k,
},
Scenario {
name: "sliding_10k",
iterations: 400,
run: sliding_10k,
},
Scenario {
name: "scan_10k",
iterations: 400,
run: scan_10k,
},
Scenario {
name: "fold_10k",
iterations: 400,
run: fold_10k,
},
Scenario {
name: "fold_async_10k",
iterations: 200,
run: fold_async_10k,
},
Scenario {
name: "unfold_resource_10k",
iterations: 200,
run: unfold_resource_10k,
},
Scenario {
name: "filter_10k",
iterations: 400,
run: filter_10k,
},
Scenario {
name: "take_5k_of_10k",
iterations: 400,
run: take_5k_of_10k,
},
Scenario {
name: "drop_5k_of_10k",
iterations: 400,
run: drop_5k_of_10k,
},
Scenario {
name: "take_while_5k",
iterations: 400,
run: take_while_5k,
},
Scenario {
name: "limit_10k",
iterations: 400,
run: limit_10k,
},
Scenario {
name: "map_concat_10k",
iterations: 400,
run: map_concat_10k,
},
Scenario {
name: "zip_with_index_10k",
iterations: 400,
run: zip_with_index_10k,
},
Scenario {
name: "map_async_ordered_10k_p1",
iterations: 100,
run: map_async_ordered_10k_p1,
},
Scenario {
name: "map_async_ordered_10k_p4",
iterations: 100,
run: map_async_ordered_10k_p4,
},
Scenario {
name: "map_async_ordered_10k_p32",
iterations: 100,
run: map_async_ordered_10k_p32,
},
Scenario {
name: "map_async_unordered_10k_p1",
iterations: 100,
run: map_async_unordered_10k_p1,
},
Scenario {
name: "map_async_unordered_10k_p4",
iterations: 100,
run: map_async_unordered_10k_p4,
},
Scenario {
name: "map_async_unordered_10k_p32",
iterations: 100,
run: map_async_unordered_10k_p32,
},
Scenario {
name: "map_async_partitioned_10k_p1",
iterations: 100,
run: map_async_partitioned_10k_p1,
},
Scenario {
name: "map_async_partitioned_10k_p4",
iterations: 100,
run: map_async_partitioned_10k_p4,
},
Scenario {
name: "map_async_partitioned_10k_p32",
iterations: 100,
run: map_async_partitioned_10k_p32,
},
Scenario {
name: "error_recover_1k",
iterations: 1_000,
run: error_recover_1k,
},
Scenario {
name: "supervision_map_resume_100k_no_error",
iterations: 100,
run: supervision_map_resume_100k_no_error,
},
Scenario {
name: "restart_source_backoff_accuracy_3x2ms",
iterations: 20,
run: restart_source_backoff_accuracy_3x2ms,
},
Scenario {
name: "restart_source_completion_cap_3",
iterations: 1_000,
run: restart_source_completion_cap_3,
},
Scenario {
name: "restart_flow_drop_in_flight_1",
iterations: 1_000,
run: restart_flow_drop_in_flight_1,
},
Scenario {
name: "retry_flow_with_backoff",
iterations: 1_000,
run: retry_flow_with_backoff,
},
Scenario {
name: "cancelled_repeat",
iterations: 10_000,
run: cancelled_repeat,
},
Scenario {
name: "buffer_fast_producer_slow_consumer_backpressure",
iterations: 200,
run: buffer_fast_producer_slow_consumer_backpressure,
},
Scenario {
name: "conflate_fast_producer_slow_consumer",
iterations: 200,
run: conflate_fast_producer_slow_consumer,
},
Scenario {
name: "batch_fast_producer_slow_consumer",
iterations: 200,
run: batch_fast_producer_slow_consumer,
},
Scenario {
name: "batch_weighted_fast_producer_slow_consumer",
iterations: 200,
run: batch_weighted_fast_producer_slow_consumer,
},
Scenario {
name: "aggregate_with_boundary_fast_producer_slow_consumer",
iterations: 200,
run: aggregate_with_boundary_fast_producer_slow_consumer,
},
Scenario {
name: "detach_fast_producer_slow_consumer",
iterations: 200,
run: detach_fast_producer_slow_consumer,
},
Scenario {
name: "expand_slow_producer_fast_consumer",
iterations: 200,
run: expand_slow_producer_fast_consumer,
},
Scenario {
name: "extrapolate_slow_producer_fast_consumer",
iterations: 200,
run: extrapolate_slow_producer_fast_consumer,
},
Scenario {
name: "throttle_shaping_accuracy_24",
iterations: 20,
run: throttle_shaping_accuracy_24,
},
Scenario {
name: "completion_timeout_latency_25ms",
iterations: 20,
run: completion_timeout_latency_25ms,
},
Scenario {
name: "grouped_within_flush_timing",
iterations: 20,
run: grouped_within_flush_timing,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..10 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn empty_ignore() -> u64 {
Source::<u64>::empty()
.run_with(Sink::ignore())
.unwrap()
.wait()
.unwrap();
0
}
fn single_ignore() -> u64 {
Source::single(1_u64)
.run_with(Sink::ignore())
.unwrap()
.wait()
.unwrap();
1
}
fn repeat_take_1k_ignore() -> u64 {
Source::repeat(1_u64)
.take(1_000)
.run_with(Sink::ignore())
.unwrap()
.wait()
.unwrap();
1_000
}
fn from_iter_collect_1k() -> u64 {
Source::from_iter(0_u64..1_000)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap()
.len() as u64
}
fn map_chain(stages: usize) -> u64 {
let mut source = Source::from_iter(0_u64..100_000);
for _ in 0..stages {
source = source.map(|item| item.wrapping_add(1));
}
source
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn map_chain_100k_1() -> u64 {
map_chain(1)
}
fn map_chain_100k_5() -> u64 {
map_chain(5)
}
fn map_chain_100k_10() -> u64 {
map_chain(10)
}
fn linear_async_boundary_100k() -> u64 {
Source::from_iter(0_u64..100_000)
.map(|item| item.wrapping_add(1))
.async_boundary_with_buffer(16)
.map(|item| item.wrapping_add(1))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn grouped_scan_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.grouped(32)
.map(|group| group.into_iter().sum::<u64>())
.scan(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::last())
.unwrap()
.wait()
.unwrap()
}
fn sliding_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.sliding(32, 16)
.run_with(Sink::fold(0_u64, |acc, window: Vec<u64>| {
acc.wrapping_add(window.len() as u64)
}))
.unwrap()
.wait()
.unwrap()
}
fn scan_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.scan(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::last())
.unwrap()
.wait()
.unwrap()
}
fn fold_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.fold(0_u64, |acc, item| acc.wrapping_add(item))
.run_with(Sink::head())
.unwrap()
.wait()
.unwrap()
}
fn fold_async_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.fold_async(0_u64, |acc, item| async move { Ok(acc.wrapping_add(item)) })
.run_with(Sink::head())
.unwrap()
.wait()
.unwrap()
}
fn unfold_resource_10k() -> u64 {
Source::unfold_resource(
|| Ok(0_u64),
|next| {
if *next == 10_000 {
Ok(None)
} else {
let item = *next;
*next += 1;
Ok(Some(item))
}
},
|_| Ok(()),
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn filter_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.filter(|item| item % 2 == 0)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn take_5k_of_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.take(5_000)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn drop_5k_of_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.drop(5_000)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn take_while_5k() -> u64 {
Source::from_iter(0_u64..10_000)
.take_while(|item| *item < 5_000)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn limit_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.limit(10_000)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn map_concat_10k() -> u64 {
Source::from_iter(0_u64..10_000)
.map_concat(|item| [item, item.wrapping_add(1)])
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn zip_with_index_10k() -> u64 {
let (_, index) = Source::from_iter(0_u64..10_000)
.zip_with_index()
.run_with(Sink::last())
.unwrap()
.wait()
.unwrap();
index
}
fn map_async_ordered(parallelism: usize) -> u64 {
Source::from_iter(0_u64..10_000)
.map_async(parallelism, |item| async move { Ok(item.wrapping_add(1)) })
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn map_async_ordered_10k_p1() -> u64 {
map_async_ordered(1)
}
fn map_async_ordered_10k_p4() -> u64 {
map_async_ordered(4)
}
fn map_async_ordered_10k_p32() -> u64 {
map_async_ordered(32)
}
fn map_async_unordered(parallelism: usize) -> u64 {
Source::from_iter(0_u64..10_000)
.map_async_unordered(parallelism, |item| async move { Ok(item.wrapping_add(1)) })
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn map_async_unordered_10k_p1() -> u64 {
map_async_unordered(1)
}
fn map_async_unordered_10k_p4() -> u64 {
map_async_unordered(4)
}
fn map_async_unordered_10k_p32() -> u64 {
map_async_unordered(32)
}
fn map_async_partitioned(parallelism: usize) -> u64 {
Source::from_iter(0_u64..10_000)
.map_async_partitioned(
parallelism,
1,
|item| item % 16,
|item| async move { Ok(item.wrapping_add(1)) },
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn map_async_partitioned_10k_p1() -> u64 {
map_async_partitioned(1)
}
fn map_async_partitioned_10k_p4() -> u64 {
map_async_partitioned(4)
}
fn map_async_partitioned_10k_p32() -> u64 {
map_async_partitioned(32)
}
fn error_recover_1k() -> u64 {
Source::<u64>::failed(StreamError::Failed("bench".into()))
.recover_with_retries(1, |_| Some(Source::from_iter(0_u64..1_000)))
.map_error(|error| StreamError::Failed(format!("mapped: {error}")))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn supervision_map_resume_100k_no_error() -> u64 {
Source::from_iter(0_u64..100_000)
.map_result_with_supervision(
|item| Ok(item.wrapping_add(1)),
Supervision::resuming_decider(),
)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn restart_source_backoff_accuracy_3x2ms() -> u64 {
let attempts = std::sync::Arc::new(AtomicUsize::new(0));
let settings = RestartSettings::new(Duration::from_millis(2), Duration::from_millis(2), 0.0)
.with_max_restarts(3, Duration::from_secs(1));
RestartSource::on_failures_with_backoff(settings, {
let attempts = std::sync::Arc::clone(&attempts);
move || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
if attempt < 3 {
Source::failed(StreamError::Failed("restart".into()))
} else {
Source::single(1_u64)
}
}
})
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.unwrap()
.wait()
.unwrap()
}
fn restart_source_completion_cap_3() -> u64 {
let attempts = std::sync::Arc::new(AtomicUsize::new(0));
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(3, Duration::from_secs(1));
RestartSource::with_backoff(settings, {
let attempts = std::sync::Arc::clone(&attempts);
move || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst) as u64;
Source::single(attempt)
}
})
.run_with(Sink::fold(0_u64, |count, _| count + 1))
.unwrap()
.wait()
.unwrap()
}
fn restart_flow_drop_in_flight_1() -> u64 {
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(1, Duration::from_secs(1));
Source::from_iter(1_u64..=5)
.via(RestartFlow::on_failures_with_backoff(settings, || {
Flow::identity().map_result(|item| {
if item == 3 {
Err(StreamError::Failed("drop".into()))
} else {
Ok(item)
}
})
}))
.run_with(Sink::fold(0_u64, |count, _| count + 1))
.unwrap()
.wait()
.unwrap()
}
fn retry_flow_with_backoff() -> u64 {
Source::from_iter([5_u64, 1_u64])
.via(RetryFlow::with_backoff(
Duration::ZERO,
Duration::ZERO,
0.0,
3,
Flow::identity().map(|item| item / 2),
|_, output| (*output > 0).then_some(*output),
))
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.unwrap()
.wait()
.unwrap()
}
fn cancelled_repeat() -> u64 {
Source::repeat(1_u64).run_with(Sink::cancelled()).unwrap();
0
}
fn buffer_fast_producer_slow_consumer_backpressure() -> u64 {
Source::from_iter(0_u64..256)
.buffer(16, OverflowStrategy::Backpressure)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn conflate_fast_producer_slow_consumer() -> u64 {
Source::from_iter(0_u64..256)
.conflate(|left, right| left.wrapping_add(right))
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn batch_fast_producer_slow_consumer() -> u64 {
Source::from_iter(0_u64..256)
.batch(16, |item| item, |left, right| left.wrapping_add(right))
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn batch_weighted_fast_producer_slow_consumer() -> u64 {
Source::from_iter(0_u64..256)
.batch_weighted(
64,
|_| 4,
|item| item,
|left, right| left.wrapping_add(right),
)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn aggregate_with_boundary_fast_producer_slow_consumer() -> u64 {
Source::from_iter(0_u64..256)
.aggregate_with_boundary(
|| (0_u64, 0_usize),
|(sum, count), item| {
let next = (sum.wrapping_add(item), count + 1);
(next, next.1 >= 16)
},
|(sum, _)| sum,
None,
)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn detach_fast_producer_slow_consumer() -> u64 {
Source::from_iter(0_u64..256)
.detach()
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn expand_slow_producer_fast_consumer() -> u64 {
Source::from_iter(0_u64..256)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.expand(std::iter::repeat)
.take(512)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn extrapolate_slow_producer_fast_consumer() -> u64 {
Source::from_iter(0_u64..256)
.map(|item| {
thread::sleep(Duration::from_micros(10));
item
})
.extrapolate(std::iter::repeat, None)
.take(512)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn throttle_shaping_accuracy_24() -> u64 {
Source::from_iter(0_u64..24)
.throttle(1, Duration::from_millis(2), 0, ThrottleMode::Shaping)
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn completion_timeout_latency_25ms() -> u64 {
match Source::<u64>::never()
.completion_timeout(Duration::from_millis(25))
.run_with(Sink::collect())
.unwrap()
.wait()
{
Err(StreamError::Failed(_)) => 1,
other => panic!("expected completion timeout failure, got {other:?}"),
}
}
fn grouped_within_flush_timing() -> u64 {
let groups = Source::single(1_u64)
.concat(Source::single(2_u64).initial_delay(Duration::from_millis(25)))
.grouped_within(16, Duration::from_millis(15))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
groups.len() as u64
}