use std::{
alloc::{GlobalAlloc, Layout, System},
env,
hint::black_box,
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use datum::{
BroadcastHub, Keep, KillSwitches, Materializer, MergeHub, PartitionHub, Sink, Source,
StreamCompletion,
};
const HUB_BUFFER_SIZE: usize = 256;
const HUB_ITEMS_PER_PRODUCER: u64 = 10_000;
const HUB_ITEMS_BROADCAST: u64 = 10_000;
const KILL_SWITCH_STREAM_ITEMS: usize = 64;
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "kill_switch_shared_shutdown_fanout_1",
iterations: 200,
run: kill_switch_shared_shutdown_fanout_1,
},
Scenario {
name: "kill_switch_shared_shutdown_fanout_100",
iterations: 40,
run: kill_switch_shared_shutdown_fanout_100,
},
Scenario {
name: "kill_switch_shared_shutdown_fanout_10000",
iterations: 3,
run: kill_switch_shared_shutdown_fanout_10000,
},
Scenario {
name: "merge_hub_throughput_p1",
iterations: 200,
run: merge_hub_throughput_p1,
},
Scenario {
name: "merge_hub_throughput_p4",
iterations: 80,
run: merge_hub_throughput_p4,
},
Scenario {
name: "merge_hub_throughput_p16",
iterations: 30,
run: merge_hub_throughput_p16,
},
Scenario {
name: "broadcast_hub_throughput_c1",
iterations: 200,
run: broadcast_hub_throughput_c1,
},
Scenario {
name: "broadcast_hub_throughput_c4",
iterations: 80,
run: broadcast_hub_throughput_c4,
},
Scenario {
name: "broadcast_hub_throughput_c16",
iterations: 30,
run: broadcast_hub_throughput_c16,
},
Scenario {
name: "partition_hub_throughput_c1",
iterations: 200,
run: partition_hub_throughput_c1,
},
Scenario {
name: "partition_hub_throughput_c4",
iterations: 80,
run: partition_hub_throughput_c4,
},
Scenario {
name: "partition_hub_throughput_c16",
iterations: 30,
run: partition_hub_throughput_c16,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
let scenario_filter = env::var("DATUM_DYNAMIC_STREAMS_SCENARIO").ok();
let iteration_override = env::var("DATUM_DYNAMIC_STREAMS_ITERATIONS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|iterations| *iterations > 0);
for scenario in scenarios {
if scenario_filter
.as_deref()
.is_some_and(|filter| !scenario.name.contains(filter))
{
continue;
}
let iterations = iteration_override.unwrap_or(scenario.iterations);
for _ in 0..3 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, iterations
);
}
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn wait<T>(completion: StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn kill_switch_shared_shutdown(streams: usize) -> u64 {
let switch = KillSwitches::shared("compare-shared");
let completions = (0..streams)
.map(|_| {
Source::repeat(1_u64)
.take(KILL_SWITCH_STREAM_ITEMS)
.via(switch.flow())
.run_with_materializer(Sink::ignore(), materializer())
.unwrap()
})
.collect::<Vec<_>>();
switch.shutdown();
for completion in completions {
black_box(wait(completion));
}
streams as u64
}
fn merge_hub_throughput(producers: usize) -> u64 {
let ((hub_sink, control), completion) = MergeHub::source_with_draining::<u64>(HUB_BUFFER_SIZE)
.to_mat(Sink::fold(0_u64, |acc, _| acc + 1), Keep::both)
.run_with_materializer(materializer())
.unwrap();
for _ in 0..producers {
hub_sink
.clone()
.run_with(Source::from_iter(0_u64..HUB_ITEMS_PER_PRODUCER))
.unwrap();
}
control.drain_and_complete();
wait(completion)
}
fn broadcast_hub_throughput(consumers: usize) -> u64 {
let hub = Source::from_iter(0_u64..HUB_ITEMS_BROADCAST)
.run_with_materializer(
BroadcastHub::sink_starting_after(consumers, HUB_BUFFER_SIZE),
materializer(),
)
.unwrap();
let completions = (0..consumers)
.map(|_| {
hub.source()
.run_with(Sink::fold(0_u64, |acc, _| acc + 1))
.unwrap()
})
.collect::<Vec<_>>();
completions.into_iter().map(wait).sum()
}
fn partition_hub_throughput(consumers: usize) -> u64 {
let hub = Source::from_iter(0_u64..HUB_ITEMS_BROADCAST)
.run_with_materializer(
PartitionHub::sink(
|info, item| {
let idx = (*item as usize) % info.size();
info.consumer_id_by_idx(idx) as isize
},
consumers,
HUB_BUFFER_SIZE,
),
materializer(),
)
.unwrap();
let completions = (0..consumers)
.map(|_| {
hub.source()
.run_with(Sink::fold(0_u64, |acc, _| acc + 1))
.unwrap()
})
.collect::<Vec<_>>();
completions.into_iter().map(wait).sum()
}
fn kill_switch_shared_shutdown_fanout_1() -> u64 {
kill_switch_shared_shutdown(1)
}
fn kill_switch_shared_shutdown_fanout_100() -> u64 {
kill_switch_shared_shutdown(100)
}
fn kill_switch_shared_shutdown_fanout_10000() -> u64 {
kill_switch_shared_shutdown(10_000)
}
fn merge_hub_throughput_p1() -> u64 {
merge_hub_throughput(1)
}
fn merge_hub_throughput_p4() -> u64 {
merge_hub_throughput(4)
}
fn merge_hub_throughput_p16() -> u64 {
merge_hub_throughput(16)
}
fn broadcast_hub_throughput_c1() -> u64 {
broadcast_hub_throughput(1)
}
fn broadcast_hub_throughput_c4() -> u64 {
broadcast_hub_throughput(4)
}
fn broadcast_hub_throughput_c16() -> u64 {
broadcast_hub_throughput(16)
}
fn partition_hub_throughput_c1() -> u64 {
partition_hub_throughput(1)
}
fn partition_hub_throughput_c4() -> u64 {
partition_hub_throughput(4)
}
fn partition_hub_throughput_c16() -> u64 {
partition_hub_throughput(16)
}