datum-core 0.6.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use criterion::{Criterion, criterion_group, criterion_main};
use datum::{Keep, Sink, Source, StreamCompletion};
use std::{
    hint::black_box,
    sync::atomic::{AtomicU64, Ordering},
};

fn wait<T>(completion: StreamCompletion<T>) -> T {
    completion.wait().unwrap()
}

fn source_chain(stages: usize, len: u64) -> Source<u64> {
    let mut source = Source::from_fn_iter(move || 0..len);
    for _ in 0..stages {
        source = source.map(|item| item.wrapping_add(1));
    }
    source
}

fn graph_construction(c: &mut Criterion) {
    let mut group = c.benchmark_group("graph_construction");

    for stages in [1_usize, 10, 100, 1_000] {
        group.bench_function(format!("{stages}_stages"), |b| {
            b.iter(|| {
                let graph = source_chain(stages, 1).to(Sink::ignore());
                black_box(graph)
            });
        });
    }

    group.finish();
}

fn materialization_latency(c: &mut Criterion) {
    let mut group = c.benchmark_group("materialization_latency");

    for stages in [1_usize, 10, 100] {
        let graph = source_chain(stages, 1).to(Sink::ignore());
        group.bench_function(format!("{stages}_operators"), |b| {
            b.iter(|| black_box(graph.run().unwrap()));
        });
    }

    group.finish();
}

fn materialized_value_overhead(c: &mut Criterion) {
    let mut group = c.benchmark_group("materialized_value_overhead");

    group.bench_function("keep_left", |b| {
        b.iter(|| {
            let materialized = Source::single(1_u64)
                .map_materialized_value(|_| 1_u64)
                .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::left)
                .run()
                .unwrap();
            black_box(materialized)
        });
    });

    group.bench_function("keep_right", |b| {
        b.iter(|| {
            let materialized = Source::single(1_u64)
                .map_materialized_value(|_| 1_u64)
                .to_mat(
                    Sink::ignore().map_materialized_value(|_| 2_u64),
                    Keep::right,
                )
                .run()
                .unwrap();
            black_box(materialized)
        });
    });

    group.bench_function("keep_both", |b| {
        b.iter(|| {
            let materialized = Source::single(1_u64)
                .map_materialized_value(|_| 1_u64)
                .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::both)
                .run()
                .unwrap();
            black_box(materialized)
        });
    });

    group.bench_function("keep_none", |b| {
        b.iter(|| {
            let materialized = Source::single(1_u64)
                .map_materialized_value(|_| 1_u64)
                .to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::none)
                .run()
                .unwrap();
            black_box(materialized)
        });
    });

    group.bench_function("custom", |b| {
        b.iter(|| {
            let materialized = Source::single(1_u64)
                .map_materialized_value(|_| 1_u64)
                .to_mat(
                    Sink::ignore().map_materialized_value(|_| 2_u64),
                    |left, right| left + right,
                )
                .run()
                .unwrap();
            black_box(materialized)
        });
    });

    group.finish();
}

fn lazy_first_demand_overhead(c: &mut Criterion) {
    let mut group = c.benchmark_group("lazy_first_demand_overhead");

    group.bench_function("eager_source_first_demand", |b| {
        b.iter(|| {
            let (mat, queue) = Source::single(1_u64)
                .map_materialized_value(|_| 7_u64)
                .to_mat(Sink::queue(), Keep::both)
                .run()
                .unwrap();
            let item = queue.pull().unwrap().unwrap_or_default();
            black_box(item.wrapping_add(mat))
        });
    });

    group.bench_function("lazy_source_first_demand", |b| {
        b.iter(|| {
            let (mat, queue) = Source::<u64>::lazy_source(|| {
                Source::single(1_u64).map_materialized_value(|_| 7_u64)
            })
            .to_mat(Sink::queue(), Keep::both)
            .run()
            .unwrap();
            let item = queue.pull().unwrap().unwrap_or_default();
            let mat = wait(mat);
            black_box(item.wrapping_add(mat))
        });
    });

    group.finish();
}

fn sink_terminal_costs(c: &mut Criterion) {
    let mut group = c.benchmark_group("sink_terminal_costs");

    group.bench_function("ignore", |b| {
        b.iter(|| {
            let materialized = Source::from_iter(0_u64..1_000)
                .run_with(Sink::ignore())
                .unwrap();
            let materialized = wait(materialized);
            black_box(materialized)
        });
    });

    group.bench_function("head", |b| {
        b.iter(|| {
            let materialized = Source::from_iter(0_u64..1_000)
                .run_with(Sink::head())
                .unwrap();
            let materialized = wait(materialized);
            black_box(materialized)
        });
    });

    group.bench_function("fold", |b| {
        b.iter(|| {
            let materialized = Source::from_iter(0_u64..1_000)
                .run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
                .unwrap();
            let materialized = wait(materialized);
            black_box(materialized)
        });
    });

    group.bench_function("foreach", |b| {
        b.iter(|| {
            let sum = AtomicU64::new(0);
            let materialized = Source::from_iter(0_u64..1_000)
                .run_with(Sink::foreach(move |item| {
                    black_box(sum.fetch_add(item, Ordering::Relaxed));
                }))
                .unwrap();
            let materialized = wait(materialized);
            black_box(materialized)
        });
    });

    group.finish();
}

fn repeated_materialization(c: &mut Criterion) {
    let graph = Source::from_iter(0_u64..128)
        .map(|item| item.wrapping_add(1))
        .to_mat(
            Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
            Keep::right,
        );

    c.bench_function("repeated_materialization", |b| {
        b.iter(|| black_box(wait(graph.run().unwrap())));
    });
}

criterion_group!(
    benches,
    graph_construction,
    materialization_latency,
    materialized_value_overhead,
    lazy_first_demand_overhead,
    sink_terminal_costs,
    repeated_materialization
);
criterion_main!(benches);