datum-core 0.4.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    alloc::{GlobalAlloc, Layout, System},
    hint::black_box,
    sync::{
        OnceLock,
        atomic::{AtomicU64, Ordering},
    },
    thread,
    time::Instant,
};

use datum::{Keep, Materializer, OverflowStrategy, Sink, Source};

const QUEUE_ITEMS: u64 = 10_000;
const BOUNDED_CAPACITY: usize = 256;

struct CountingAllocator;

static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc_zeroed(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) };
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
        if !new_ptr.is_null() {
            if new_ptr == ptr {
                if new_size > layout.size() {
                    ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
                }
            } else {
                ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
            }
        }
        new_ptr
    }
}

#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;

fn process_cpu_ns() -> u128 {
    let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
        return 0;
    };
    let Some(close) = stat.rfind(')') else {
        return 0;
    };
    let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
    if fields.len() <= 12 {
        return 0;
    }
    let utime: u64 = fields[11].parse().unwrap_or(0);
    let stime: u64 = fields[12].parse().unwrap_or(0);
    (utime as u128 + stime as u128) * 10_000_000
}

struct Scenario {
    name: &'static str,
    iterations: u64,
    run: fn() -> u64,
}

fn main() {
    let scenarios = [
        Scenario {
            name: "bounded_source_queue_offer_accepted",
            iterations: 200,
            run: bounded_source_queue_offer_accepted,
        },
        Scenario {
            name: "bounded_source_queue_offer_dropped",
            iterations: 200,
            run: bounded_source_queue_offer_dropped,
        },
        Scenario {
            name: "source_queue_backpressure_throughput",
            iterations: 80,
            run: source_queue_backpressure_throughput,
        },
        Scenario {
            name: "source_queue_drop_head_overflow",
            iterations: 200,
            run: source_queue_drop_head_overflow,
        },
        Scenario {
            name: "sink_queue_pull_throughput",
            iterations: 200,
            run: sink_queue_pull_throughput,
        },
    ];

    println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
    for scenario in scenarios {
        for _ in 0..3 {
            black_box((scenario.run)());
        }

        let cpu_start = process_cpu_ns();
        ALLOCATED_BYTES.store(0, Ordering::Relaxed);
        let started = Instant::now();
        let mut checksum = 0_u64;
        for _ in 0..scenario.iterations {
            checksum = checksum.wrapping_add(black_box((scenario.run)()));
        }
        let elapsed = started.elapsed();
        let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
        let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
        black_box(checksum);

        let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
        let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
        let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
        println!(
            "{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
            scenario.name, scenario.iterations
        );
    }
}

fn materializer() -> &'static Materializer {
    static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
    MATERIALIZER.get_or_init(Materializer::new)
}

fn bounded_source_queue_offer_accepted() -> u64 {
    let (queue, _stream) = Source::<u64>::queue_bounded(BOUNDED_CAPACITY)
        .to_mat(Sink::ignore(), Keep::both)
        .run_with_materializer(materializer())
        .unwrap();
    for i in 0..QUEUE_ITEMS {
        black_box(queue.offer(i));
    }
    queue.complete();
    QUEUE_ITEMS
}

fn bounded_source_queue_offer_dropped() -> u64 {
    let (queue, _stream) = Source::<u64>::queue_bounded(1)
        .to_mat(Sink::ignore(), Keep::both)
        .run_with_materializer(materializer())
        .unwrap();
    queue.offer(0);
    for i in 0..QUEUE_ITEMS {
        black_box(queue.offer(i));
    }
    queue.complete();
    QUEUE_ITEMS
}

fn source_queue_backpressure_throughput() -> u64 {
    let (queue, stream) = Source::<u64>::queue(1, OverflowStrategy::Backpressure)
        .to_mat(Sink::ignore(), Keep::both)
        .run_with_materializer(materializer())
        .unwrap();

    let consumer = thread::spawn(move || {
        drop(stream);
    });

    for i in 0..QUEUE_ITEMS {
        black_box(queue.offer(i)).unwrap();
    }
    queue.complete();
    consumer.join().unwrap();
    QUEUE_ITEMS
}

fn source_queue_drop_head_overflow() -> u64 {
    let (queue, _stream) = Source::<u64>::queue(1, OverflowStrategy::DropHead)
        .to_mat(Sink::ignore(), Keep::both)
        .run_with_materializer(materializer())
        .unwrap();
    for i in 0..QUEUE_ITEMS {
        black_box(queue.offer(i)).unwrap();
    }
    queue.complete();
    QUEUE_ITEMS
}

fn sink_queue_pull_throughput() -> u64 {
    let queue = Source::from_iter(0_u64..QUEUE_ITEMS)
        .run_with_materializer(Sink::queue(), materializer())
        .unwrap();
    let mut count = 0_u64;
    while let Ok(Some(_item)) = queue.pull() {
        count += 1;
    }
    count
}