datum-core 0.4.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    alloc::{GlobalAlloc, Layout, System},
    hint::black_box,
    sync::atomic::{AtomicU64, Ordering},
    time::Instant,
};

use datum::{Flow, Sink, Source};

const ITEMS_10K: u64 = 10_000;
const ITEMS_2K: u64 = 2_000;
const SEGMENT: u64 = 32;
const CHURN_KEYS_1K: u64 = 1_000;

struct CountingAllocator;

static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc_zeroed(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) };
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
        if !new_ptr.is_null() {
            if new_ptr == ptr {
                if new_size > layout.size() {
                    ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
                }
            } else {
                ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
            }
        }
        new_ptr
    }
}

#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;

fn process_cpu_ns() -> u128 {
    let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
        return 0;
    };
    let Some(close) = stat.rfind(')') else {
        return 0;
    };
    let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
    if fields.len() <= 12 {
        return 0;
    }
    let utime: u64 = fields[11].parse().unwrap_or(0);
    let stime: u64 = fields[12].parse().unwrap_or(0);
    (utime as u128 + stime as u128) * 10_000_000
}

struct Scenario {
    name: &'static str,
    iterations: u64,
    run: fn() -> u64,
}

fn main() {
    let mut args = std::env::args().skip(1);
    let scenario_filter = args.next();
    let iterations_override = args.next().and_then(|value| value.parse::<u64>().ok());

    let scenarios = [
        Scenario {
            name: "prefix_and_tail_rebuild_10k",
            iterations: 50,
            run: prefix_and_tail_rebuild_10k,
        },
        Scenario {
            name: "flat_map_prefix_dispatch_10k",
            iterations: 100,
            run: flat_map_prefix_dispatch_10k,
        },
        Scenario {
            name: "split_when_rebuild_10k",
            iterations: 10,
            run: split_when_rebuild_10k,
        },
        Scenario {
            name: "split_after_rebuild_10k",
            iterations: 10,
            run: split_after_rebuild_10k,
        },
        Scenario {
            name: "flat_map_concat_expand_2k_x4",
            iterations: 10,
            run: flat_map_concat_expand_2k_x4,
        },
        Scenario {
            name: "flat_map_merge_expand_2k_x4_b8",
            iterations: 30,
            run: flat_map_merge_expand_2k_x4_b8,
        },
        Scenario {
            name: "group_by_single_key_10k",
            iterations: 10,
            run: group_by_single_key_10k,
        },
        Scenario {
            name: "group_by_closed_key_churn_1k",
            iterations: 10,
            run: group_by_closed_key_churn_1k,
        },
    ];

    println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
    for scenario in scenarios {
        if scenario_filter
            .as_ref()
            .is_some_and(|filter| scenario.name != filter)
        {
            continue;
        }
        let iterations = iterations_override.unwrap_or(scenario.iterations);

        for _ in 0..3 {
            black_box((scenario.run)());
        }

        let cpu_start = process_cpu_ns();
        ALLOCATED_BYTES.store(0, Ordering::Relaxed);
        let started = Instant::now();
        let mut checksum = 0_u64;
        for _ in 0..iterations {
            checksum = checksum.wrapping_add(black_box((scenario.run)()));
        }
        let elapsed = started.elapsed();
        let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
        let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
        black_box(checksum);

        println!(
            "{}\t{}\t{:.2}\t{:.2}\t{:.2}",
            scenario.name,
            iterations,
            elapsed.as_nanos() as f64 / iterations as f64,
            allocated_bytes as f64 / iterations as f64,
            cpu_delta as f64 / iterations as f64,
        );
    }
}

fn prefix_and_tail_rebuild_10k() -> u64 {
    Source::from_iter(0_u64..ITEMS_10K)
        .prefix_and_tail(SEGMENT as usize)
        .flat_map_concat(|(prefix, tail)| Source::from_iter(prefix).concat(tail))
        .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
        .unwrap()
        .wait()
        .unwrap()
}

fn flat_map_prefix_dispatch_10k() -> u64 {
    Source::from_iter(0_u64..ITEMS_10K)
        .flat_map_prefix(SEGMENT as usize, |prefix| {
            let bias = prefix.iter().copied().sum::<u64>();
            Flow::identity().map(move |item: u64| item.wrapping_add(bias))
        })
        .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
        .unwrap()
        .wait()
        .unwrap()
}

fn split_when_rebuild_10k() -> u64 {
    let outer = Source::from_iter(0_u64..ITEMS_10K)
        .split_when(|item| *item != 0 && *item % SEGMENT == 0)
        .run_with(Sink::queue())
        .unwrap();
    let mut sum = 0_u64;
    while let Some(substream) = outer.pull().unwrap() {
        sum = sum.wrapping_add(
            substream
                .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
                .unwrap()
                .wait()
                .unwrap(),
        );
    }
    sum
}

fn split_after_rebuild_10k() -> u64 {
    let outer = Source::from_iter(0_u64..ITEMS_10K)
        .split_after(|item| (*item + 1) % SEGMENT == 0)
        .run_with(Sink::queue())
        .unwrap();
    let mut sum = 0_u64;
    while let Some(substream) = outer.pull().unwrap() {
        sum = sum.wrapping_add(
            substream
                .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
                .unwrap()
                .wait()
                .unwrap(),
        );
    }
    sum
}

fn flat_map_concat_expand_2k_x4() -> u64 {
    Source::from_iter(0_u64..ITEMS_2K)
        .flat_map_concat(|item| Source::from_iter([item, item + 1, item + 2, item + 3]))
        .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
        .unwrap()
        .wait()
        .unwrap()
}

fn flat_map_merge_expand_2k_x4_b8() -> u64 {
    Source::from_iter(0_u64..ITEMS_2K)
        .flat_map_merge(8, |item| {
            Source::from_iter([item, item + 1, item + 2, item + 3])
        })
        .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
        .unwrap()
        .wait()
        .unwrap()
}

fn group_by_single_key_10k() -> u64 {
    let outer = Source::from_iter(0_u64..ITEMS_10K)
        .group_by(1, |_| 0_u8, false)
        .run_with(Sink::queue())
        .unwrap();
    let substream = outer.pull().unwrap().unwrap();
    let sum = substream
        .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
        .unwrap()
        .wait()
        .unwrap();
    assert!(outer.pull().unwrap().is_none());
    sum
}

fn group_by_closed_key_churn_1k() -> u64 {
    Source::from_iter(0_u64..CHURN_KEYS_1K)
        .group_by((CHURN_KEYS_1K * 2) as usize, |item| *item, false)
        .flat_map_merge(256, |substream| substream.take(1))
        .run_with(Sink::fold(0_u64, |acc, _| acc + 1))
        .unwrap()
        .wait()
        .unwrap()
}