use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::atomic::{AtomicU64, Ordering},
time::Instant,
};
use datum::{Flow, Sink, Source};
const ITEMS_10K: u64 = 10_000;
const ITEMS_2K: u64 = 2_000;
const SEGMENT: u64 = 32;
const CHURN_KEYS_1K: u64 = 1_000;
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let mut args = std::env::args().skip(1);
let scenario_filter = args.next();
let iterations_override = args.next().and_then(|value| value.parse::<u64>().ok());
let scenarios = [
Scenario {
name: "prefix_and_tail_rebuild_10k",
iterations: 50,
run: prefix_and_tail_rebuild_10k,
},
Scenario {
name: "flat_map_prefix_dispatch_10k",
iterations: 100,
run: flat_map_prefix_dispatch_10k,
},
Scenario {
name: "split_when_rebuild_10k",
iterations: 10,
run: split_when_rebuild_10k,
},
Scenario {
name: "split_after_rebuild_10k",
iterations: 10,
run: split_after_rebuild_10k,
},
Scenario {
name: "flat_map_concat_expand_2k_x4",
iterations: 10,
run: flat_map_concat_expand_2k_x4,
},
Scenario {
name: "flat_map_merge_expand_2k_x4_b8",
iterations: 30,
run: flat_map_merge_expand_2k_x4_b8,
},
Scenario {
name: "group_by_single_key_10k",
iterations: 10,
run: group_by_single_key_10k,
},
Scenario {
name: "group_by_closed_key_churn_1k",
iterations: 10,
run: group_by_closed_key_churn_1k,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
if scenario_filter
.as_ref()
.is_some_and(|filter| scenario.name != filter)
{
continue;
}
let iterations = iterations_override.unwrap_or(scenario.iterations);
for _ in 0..3 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
println!(
"{}\t{}\t{:.2}\t{:.2}\t{:.2}",
scenario.name,
iterations,
elapsed.as_nanos() as f64 / iterations as f64,
allocated_bytes as f64 / iterations as f64,
cpu_delta as f64 / iterations as f64,
);
}
}
fn prefix_and_tail_rebuild_10k() -> u64 {
Source::from_iter(0_u64..ITEMS_10K)
.prefix_and_tail(SEGMENT as usize)
.flat_map_concat(|(prefix, tail)| Source::from_iter(prefix).concat(tail))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn flat_map_prefix_dispatch_10k() -> u64 {
Source::from_iter(0_u64..ITEMS_10K)
.flat_map_prefix(SEGMENT as usize, |prefix| {
let bias = prefix.iter().copied().sum::<u64>();
Flow::identity().map(move |item: u64| item.wrapping_add(bias))
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn split_when_rebuild_10k() -> u64 {
let outer = Source::from_iter(0_u64..ITEMS_10K)
.split_when(|item| *item != 0 && *item % SEGMENT == 0)
.run_with(Sink::queue())
.unwrap();
let mut sum = 0_u64;
while let Some(substream) = outer.pull().unwrap() {
sum = sum.wrapping_add(
substream
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap(),
);
}
sum
}
fn split_after_rebuild_10k() -> u64 {
let outer = Source::from_iter(0_u64..ITEMS_10K)
.split_after(|item| (*item + 1) % SEGMENT == 0)
.run_with(Sink::queue())
.unwrap();
let mut sum = 0_u64;
while let Some(substream) = outer.pull().unwrap() {
sum = sum.wrapping_add(
substream
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap(),
);
}
sum
}
fn flat_map_concat_expand_2k_x4() -> u64 {
Source::from_iter(0_u64..ITEMS_2K)
.flat_map_concat(|item| Source::from_iter([item, item + 1, item + 2, item + 3]))
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn flat_map_merge_expand_2k_x4_b8() -> u64 {
Source::from_iter(0_u64..ITEMS_2K)
.flat_map_merge(8, |item| {
Source::from_iter([item, item + 1, item + 2, item + 3])
})
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap()
}
fn group_by_single_key_10k() -> u64 {
let outer = Source::from_iter(0_u64..ITEMS_10K)
.group_by(1, |_| 0_u8, false)
.run_with(Sink::queue())
.unwrap();
let substream = outer.pull().unwrap().unwrap();
let sum = substream
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.unwrap()
.wait()
.unwrap();
assert!(outer.pull().unwrap().is_none());
sum
}
fn group_by_closed_key_churn_1k() -> u64 {
Source::from_iter(0_u64..CHURN_KEYS_1K)
.group_by((CHURN_KEYS_1K * 2) as usize, |item| *item, false)
.flat_map_merge(256, |substream| substream.take(1))
.run_with(Sink::fold(0_u64, |acc, _| acc + 1))
.unwrap()
.wait()
.unwrap()
}