datum-core 0.3.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    alloc::{GlobalAlloc, Layout, System},
    hint::black_box,
    sync::{
        OnceLock,
        atomic::{AtomicU64, Ordering},
    },
    time::Instant,
};

use datum::{
    BroadcastHub, Keep, KillSwitches, Materializer, MergeHub, PartitionHub, Sink, Source,
    StreamCompletion,
};

const HUB_BUFFER_SIZE: usize = 256;
const HUB_ITEMS_PER_PRODUCER: u64 = 10_000;
const HUB_ITEMS_BROADCAST: u64 = 10_000;
const KILL_SWITCH_STREAM_ITEMS: usize = 64;

struct CountingAllocator;

static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc_zeroed(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) };
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
        if !new_ptr.is_null() {
            if new_ptr == ptr {
                if new_size > layout.size() {
                    ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
                }
            } else {
                ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
            }
        }
        new_ptr
    }
}

#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;

fn process_cpu_ns() -> u128 {
    let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
        return 0;
    };
    let Some(close) = stat.rfind(')') else {
        return 0;
    };
    let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
    if fields.len() <= 12 {
        return 0;
    }
    let utime: u64 = fields[11].parse().unwrap_or(0);
    let stime: u64 = fields[12].parse().unwrap_or(0);
    (utime as u128 + stime as u128) * 10_000_000
}

struct Scenario {
    name: &'static str,
    iterations: u64,
    run: fn() -> u64,
}

fn main() {
    let scenarios = [
        Scenario {
            name: "kill_switch_shared_shutdown_fanout_1",
            iterations: 200,
            run: kill_switch_shared_shutdown_fanout_1,
        },
        Scenario {
            name: "kill_switch_shared_shutdown_fanout_100",
            iterations: 40,
            run: kill_switch_shared_shutdown_fanout_100,
        },
        Scenario {
            name: "kill_switch_shared_shutdown_fanout_10000",
            iterations: 3,
            run: kill_switch_shared_shutdown_fanout_10000,
        },
        Scenario {
            name: "merge_hub_throughput_p1",
            iterations: 200,
            run: merge_hub_throughput_p1,
        },
        Scenario {
            name: "merge_hub_throughput_p4",
            iterations: 80,
            run: merge_hub_throughput_p4,
        },
        Scenario {
            name: "merge_hub_throughput_p16",
            iterations: 30,
            run: merge_hub_throughput_p16,
        },
        Scenario {
            name: "broadcast_hub_throughput_c1",
            iterations: 200,
            run: broadcast_hub_throughput_c1,
        },
        Scenario {
            name: "broadcast_hub_throughput_c4",
            iterations: 80,
            run: broadcast_hub_throughput_c4,
        },
        Scenario {
            name: "broadcast_hub_throughput_c16",
            iterations: 30,
            run: broadcast_hub_throughput_c16,
        },
        Scenario {
            name: "partition_hub_throughput_c1",
            iterations: 200,
            run: partition_hub_throughput_c1,
        },
        Scenario {
            name: "partition_hub_throughput_c4",
            iterations: 80,
            run: partition_hub_throughput_c4,
        },
        Scenario {
            name: "partition_hub_throughput_c16",
            iterations: 30,
            run: partition_hub_throughput_c16,
        },
    ];

    println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
    for scenario in scenarios {
        for _ in 0..3 {
            black_box((scenario.run)());
        }

        let cpu_start = process_cpu_ns();
        ALLOCATED_BYTES.store(0, Ordering::Relaxed);
        let started = Instant::now();
        let mut checksum = 0_u64;
        for _ in 0..scenario.iterations {
            checksum = checksum.wrapping_add(black_box((scenario.run)()));
        }
        let elapsed = started.elapsed();
        let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
        let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
        black_box(checksum);

        let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
        let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
        let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
        println!(
            "{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
            scenario.name, scenario.iterations
        );
    }
}

fn materializer() -> &'static Materializer {
    static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
    MATERIALIZER.get_or_init(Materializer::new)
}

fn wait<T>(completion: StreamCompletion<T>) -> T {
    completion.wait().unwrap()
}

fn kill_switch_shared_shutdown(streams: usize) -> u64 {
    let switch = KillSwitches::shared("compare-shared");
    let completions = (0..streams)
        .map(|_| {
            Source::repeat(1_u64)
                .take(KILL_SWITCH_STREAM_ITEMS)
                .via(switch.flow())
                .run_with_materializer(Sink::ignore(), materializer())
                .unwrap()
        })
        .collect::<Vec<_>>();
    switch.shutdown();
    for completion in completions {
        black_box(wait(completion));
    }
    streams as u64
}

fn merge_hub_throughput(producers: usize) -> u64 {
    let ((hub_sink, control), completion) = MergeHub::source_with_draining::<u64>(HUB_BUFFER_SIZE)
        .to_mat(Sink::fold(0_u64, |acc, _| acc + 1), Keep::both)
        .run_with_materializer(materializer())
        .unwrap();

    for _ in 0..producers {
        hub_sink
            .clone()
            .run_with(Source::from_iter(0_u64..HUB_ITEMS_PER_PRODUCER))
            .unwrap();
    }

    control.drain_and_complete();
    wait(completion)
}

fn broadcast_hub_throughput(consumers: usize) -> u64 {
    let hub = Source::from_iter(0_u64..HUB_ITEMS_BROADCAST)
        .run_with_materializer(
            BroadcastHub::sink_starting_after(consumers, HUB_BUFFER_SIZE),
            materializer(),
        )
        .unwrap();
    let completions = (0..consumers)
        .map(|_| {
            hub.source()
                .run_with(Sink::fold(0_u64, |acc, _| acc + 1))
                .unwrap()
        })
        .collect::<Vec<_>>();
    completions.into_iter().map(wait).sum()
}

fn partition_hub_throughput(consumers: usize) -> u64 {
    let hub = Source::from_iter(0_u64..HUB_ITEMS_BROADCAST)
        .run_with_materializer(
            PartitionHub::sink(
                |info, item| {
                    let idx = (*item as usize) % info.size();
                    info.consumer_id_by_idx(idx) as isize
                },
                consumers,
                HUB_BUFFER_SIZE,
            ),
            materializer(),
        )
        .unwrap();
    let completions = (0..consumers)
        .map(|_| {
            hub.source()
                .run_with(Sink::fold(0_u64, |acc, _| acc + 1))
                .unwrap()
        })
        .collect::<Vec<_>>();
    completions.into_iter().map(wait).sum()
}

fn kill_switch_shared_shutdown_fanout_1() -> u64 {
    kill_switch_shared_shutdown(1)
}

fn kill_switch_shared_shutdown_fanout_100() -> u64 {
    kill_switch_shared_shutdown(100)
}

fn kill_switch_shared_shutdown_fanout_10000() -> u64 {
    kill_switch_shared_shutdown(10_000)
}

fn merge_hub_throughput_p1() -> u64 {
    merge_hub_throughput(1)
}

fn merge_hub_throughput_p4() -> u64 {
    merge_hub_throughput(4)
}

fn merge_hub_throughput_p16() -> u64 {
    merge_hub_throughput(16)
}

fn broadcast_hub_throughput_c1() -> u64 {
    broadcast_hub_throughput(1)
}

fn broadcast_hub_throughput_c4() -> u64 {
    broadcast_hub_throughput(4)
}

fn broadcast_hub_throughput_c16() -> u64 {
    broadcast_hub_throughput(16)
}

fn partition_hub_throughput_c1() -> u64 {
    partition_hub_throughput(1)
}

fn partition_hub_throughput_c4() -> u64 {
    partition_hub_throughput(4)
}

fn partition_hub_throughput_c16() -> u64 {
    partition_hub_throughput(16)
}