datum-core 0.4.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    alloc::{GlobalAlloc, Layout, System},
    hint::black_box,
    sync::{
        OnceLock,
        atomic::{AtomicU64, Ordering},
    },
    time::Instant,
};

use datum::{Keep, Materializer, NotUsed, RunnableGraph, Sink, Source};

struct CountingAllocator;

static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc_zeroed(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) };
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
        if !new_ptr.is_null() {
            if new_ptr == ptr {
                if new_size > layout.size() {
                    ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
                }
            } else {
                ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
            }
        }
        new_ptr
    }
}

#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;

/// Process-wide CPU time (user + system, all threads) in nanoseconds, read from
/// `/proc/self/stat` utime+stime. USER_HZ is 100 on Linux (10 ms per tick), so
/// this is coarse per op but meaningful over a measured loop; it captures the
/// CPU consumed by stream worker threads (including any busy-spin).
fn process_cpu_ns() -> u128 {
    let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
        return 0;
    };
    let Some(close) = stat.rfind(')') else {
        return 0;
    };
    let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
    // After ')': index 0 = field 3 (state); utime = field 14 -> index 11,
    // stime = field 15 -> index 12.
    if fields.len() <= 12 {
        return 0;
    }
    let utime: u64 = fields[11].parse().unwrap_or(0);
    let stime: u64 = fields[12].parse().unwrap_or(0);
    (utime as u128 + stime as u128) * 10_000_000
}

struct Scenario {
    name: &'static str,
    iterations: u64,
    run: fn() -> u64,
}

fn main() {
    let scenarios = [
        Scenario {
            name: "graph_construction_1_stages",
            iterations: 50_000,
            run: graph_construction_1_stages,
        },
        Scenario {
            name: "graph_construction_10_stages",
            iterations: 20_000,
            run: graph_construction_10_stages,
        },
        Scenario {
            name: "graph_construction_100_stages",
            iterations: 5_000,
            run: graph_construction_100_stages,
        },
        Scenario {
            name: "graph_construction_1000_stages",
            iterations: 500,
            run: graph_construction_1000_stages,
        },
        Scenario {
            name: "materialization_latency_1_operators",
            iterations: 5_000,
            run: materialization_latency_1_operators,
        },
        Scenario {
            name: "materialization_latency_10_operators",
            iterations: 5_000,
            run: materialization_latency_10_operators,
        },
        Scenario {
            name: "materialization_latency_100_operators",
            iterations: 2_000,
            run: materialization_latency_100_operators,
        },
        Scenario {
            name: "materialized_value_keep_left",
            iterations: 5_000,
            run: materialized_value_keep_left,
        },
        Scenario {
            name: "materialized_value_keep_right",
            iterations: 5_000,
            run: materialized_value_keep_right,
        },
        Scenario {
            name: "materialized_value_keep_both",
            iterations: 5_000,
            run: materialized_value_keep_both,
        },
        Scenario {
            name: "materialized_value_keep_none",
            iterations: 5_000,
            run: materialized_value_keep_none,
        },
        Scenario {
            name: "materialized_value_custom",
            iterations: 5_000,
            run: materialized_value_custom,
        },
        Scenario {
            name: "eager_source_first_demand",
            iterations: 5_000,
            run: eager_source_first_demand,
        },
        Scenario {
            name: "lazy_source_first_demand",
            iterations: 5_000,
            run: lazy_source_first_demand,
        },
        Scenario {
            name: "sink_terminal_ignore",
            iterations: 4_000,
            run: sink_terminal_ignore,
        },
        Scenario {
            name: "sink_terminal_head",
            iterations: 4_000,
            run: sink_terminal_head,
        },
        Scenario {
            name: "sink_terminal_fold",
            iterations: 4_000,
            run: sink_terminal_fold,
        },
        Scenario {
            name: "sink_terminal_foreach",
            iterations: 4_000,
            run: sink_terminal_foreach,
        },
        Scenario {
            name: "repeated_materialization",
            iterations: 4_000,
            run: repeated_materialization,
        },
    ];

    println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
    for scenario in scenarios {
        for _ in 0..10 {
            black_box((scenario.run)());
        }

        let cpu_start = process_cpu_ns();
        ALLOCATED_BYTES.store(0, Ordering::Relaxed);
        let started = Instant::now();
        let mut checksum = 0_u64;
        for _ in 0..scenario.iterations {
            checksum = checksum.wrapping_add(black_box((scenario.run)()));
        }
        let elapsed = started.elapsed();
        // process_cpu_ns allocates, so read the allocation counter before the
        // final CPU sample to keep CPU sampling out of the measured window.
        let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
        let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
        black_box(checksum);

        let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
        let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
        let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
        println!(
            "{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
            scenario.name, scenario.iterations
        );
    }
}

fn shared_materializer() -> &'static Materializer {
    static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
    MATERIALIZER.get_or_init(Materializer::new)
}

/// Mirrors Akka `sourceChain(stages, 1L).to(Sink.ignore)`: a single-element
/// source through `stages` map operators into an ignoring sink.
fn ignore_graph(stages: usize) -> RunnableGraph<NotUsed> {
    let mut source = Source::from_iter(0_u64..1);
    for _ in 0..stages {
        source = source.map(|item| item.wrapping_add(1));
    }
    source.to(Sink::ignore())
}

fn build_construction(stages: usize) -> u64 {
    let graph = black_box(ignore_graph(stages));
    black_box(&graph);
    stages as u64
}

fn graph_construction_1_stages() -> u64 {
    build_construction(1)
}

fn graph_construction_10_stages() -> u64 {
    build_construction(10)
}

fn graph_construction_100_stages() -> u64 {
    build_construction(100)
}

fn graph_construction_1000_stages() -> u64 {
    build_construction(1000)
}

fn materialization_latency_1_operators() -> u64 {
    static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
    let graph = GRAPH.get_or_init(|| ignore_graph(1));
    graph.run_with_materializer(shared_materializer()).unwrap();
    0
}

fn materialization_latency_10_operators() -> u64 {
    static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
    let graph = GRAPH.get_or_init(|| ignore_graph(10));
    graph.run_with_materializer(shared_materializer()).unwrap();
    0
}

fn materialization_latency_100_operators() -> u64 {
    static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
    let graph = GRAPH.get_or_init(|| ignore_graph(100));
    graph.run_with_materializer(shared_materializer()).unwrap();
    0
}

fn materialized_value_keep_left() -> u64 {
    Source::single(1_u64)
        .map_materialized_value(|_| 1_u64)
        .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::left)
        .run_with_materializer(shared_materializer())
        .unwrap()
}

fn materialized_value_keep_right() -> u64 {
    Source::single(1_u64)
        .map_materialized_value(|_| 1_u64)
        .to_mat(
            Sink::ignore().map_materialized_value(|_| 2_u64),
            Keep::right,
        )
        .run_with_materializer(shared_materializer())
        .unwrap()
}

fn materialized_value_keep_both() -> u64 {
    let (left, right) = Source::single(1_u64)
        .map_materialized_value(|_| 1_u64)
        .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::both)
        .run_with_materializer(shared_materializer())
        .unwrap();
    left.wrapping_add(right)
}

fn materialized_value_keep_none() -> u64 {
    let _: NotUsed = Source::single(1_u64)
        .map_materialized_value(|_| 1_u64)
        .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::none)
        .run_with_materializer(shared_materializer())
        .unwrap();
    0
}

fn materialized_value_custom() -> u64 {
    Source::single(1_u64)
        .map_materialized_value(|_| 1_u64)
        .to_mat(
            Sink::ignore().map_materialized_value(|_| 2_u64),
            |left, right| left + right,
        )
        .run_with_materializer(shared_materializer())
        .unwrap()
}

fn eager_source_first_demand() -> u64 {
    let (mat, queue) = Source::single(1_u64)
        .map_materialized_value(|_| 7_u64)
        .to_mat(Sink::queue(), Keep::both)
        .run_with_materializer(shared_materializer())
        .unwrap();
    queue.pull().unwrap().unwrap_or_default().wrapping_add(mat)
}

fn lazy_source_first_demand() -> u64 {
    let (mat, queue) =
        Source::<u64>::lazy_source(|| Source::single(1_u64).map_materialized_value(|_| 7_u64))
            .to_mat(Sink::queue(), Keep::both)
            .run_with_materializer(shared_materializer())
            .unwrap();
    let item = queue.pull().unwrap().unwrap_or_default();
    item.wrapping_add(mat.wait().unwrap())
}

fn sink_terminal_ignore() -> u64 {
    Source::from_iter(0_u64..1_000)
        .run_with_materializer(Sink::ignore(), shared_materializer())
        .unwrap()
        .wait()
        .unwrap();
    0
}

fn sink_terminal_head() -> u64 {
    Source::from_iter(0_u64..1_000)
        .run_with_materializer(Sink::head(), shared_materializer())
        .unwrap()
        .wait()
        .unwrap()
}

fn sink_terminal_fold() -> u64 {
    Source::from_iter(0_u64..1_000)
        .run_with_materializer(
            Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
            shared_materializer(),
        )
        .unwrap()
        .wait()
        .unwrap()
}

fn sink_terminal_foreach() -> u64 {
    let sum = std::sync::Arc::new(AtomicU64::new(0));
    let sink_sum = std::sync::Arc::clone(&sum);
    Source::from_iter(0_u64..1_000)
        .run_with_materializer(
            Sink::foreach(move |item| {
                sink_sum.fetch_add(item, Ordering::Relaxed);
            }),
            shared_materializer(),
        )
        .unwrap()
        .wait()
        .unwrap();
    sum.load(Ordering::Relaxed)
}

fn repeated_materialization() -> u64 {
    // Mirrors Akka `sourceChain(1, 128L).toMat(Sink.fold(0)(_ + _))(Keep.right)`
    // run repeatedly with fresh runtime state.
    static GRAPH: OnceLock<RunnableGraph<datum::StreamCompletion<u64>>> = OnceLock::new();
    let graph = GRAPH.get_or_init(|| {
        Source::from_iter(0_u64..128)
            .map(|item| item.wrapping_add(1))
            .to_mat(
                Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
                Keep::right,
            )
    });
    graph
        .run_with_materializer(shared_materializer())
        .unwrap()
        .wait()
        .unwrap()
}