use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
thread,
time::Instant,
};
use datum::{Keep, Materializer, OverflowStrategy, Sink, Source};
const QUEUE_ITEMS: u64 = 10_000;
const BOUNDED_CAPACITY: usize = 256;
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "bounded_source_queue_offer_accepted",
iterations: 200,
run: bounded_source_queue_offer_accepted,
},
Scenario {
name: "bounded_source_queue_offer_dropped",
iterations: 200,
run: bounded_source_queue_offer_dropped,
},
Scenario {
name: "source_queue_backpressure_throughput",
iterations: 80,
run: source_queue_backpressure_throughput,
},
Scenario {
name: "source_queue_drop_head_overflow",
iterations: 200,
run: source_queue_drop_head_overflow,
},
Scenario {
name: "sink_queue_pull_throughput",
iterations: 200,
run: sink_queue_pull_throughput,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..3 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn bounded_source_queue_offer_accepted() -> u64 {
let (queue, _stream) = Source::<u64>::queue_bounded(BOUNDED_CAPACITY)
.to_mat(Sink::ignore(), Keep::both)
.run_with_materializer(materializer())
.unwrap();
for i in 0..QUEUE_ITEMS {
black_box(queue.offer(i));
}
queue.complete();
QUEUE_ITEMS
}
fn bounded_source_queue_offer_dropped() -> u64 {
let (queue, _stream) = Source::<u64>::queue_bounded(1)
.to_mat(Sink::ignore(), Keep::both)
.run_with_materializer(materializer())
.unwrap();
queue.offer(0);
for i in 0..QUEUE_ITEMS {
black_box(queue.offer(i));
}
queue.complete();
QUEUE_ITEMS
}
fn source_queue_backpressure_throughput() -> u64 {
let (queue, stream) = Source::<u64>::queue(1, OverflowStrategy::Backpressure)
.to_mat(Sink::ignore(), Keep::both)
.run_with_materializer(materializer())
.unwrap();
let consumer = thread::spawn(move || {
drop(stream);
});
for i in 0..QUEUE_ITEMS {
black_box(queue.offer(i)).unwrap();
}
queue.complete();
consumer.join().unwrap();
QUEUE_ITEMS
}
fn source_queue_drop_head_overflow() -> u64 {
let (queue, _stream) = Source::<u64>::queue(1, OverflowStrategy::DropHead)
.to_mat(Sink::ignore(), Keep::both)
.run_with_materializer(materializer())
.unwrap();
for i in 0..QUEUE_ITEMS {
black_box(queue.offer(i)).unwrap();
}
queue.complete();
QUEUE_ITEMS
}
fn sink_queue_pull_throughput() -> u64 {
let queue = Source::from_iter(0_u64..QUEUE_ITEMS)
.run_with_materializer(Sink::queue(), materializer())
.unwrap();
let mut count = 0_u64;
while let Ok(Some(_item)) = queue.pull() {
count += 1;
}
count
}