commonware-runtime 2026.4.0

Execute asynchronous tasks with a configurable scheduler.
Documentation
use commonware_runtime::{
    tokio, BufferPool, BufferPoolConfig, BufferPooler, IoBufMut, Runner as _,
};
use commonware_utils::NZUsize;
use criterion::Criterion;
use std::{
    hint::{black_box, spin_loop},
    num::NonZeroUsize,
    sync::{Arc, Barrier},
    thread,
    time::{Duration, Instant},
};

const MIN_BENCH_THREADS: usize = 2;
const MAX_BENCH_THREADS: usize = 8;
const SIZES: &[usize] = &[256, 1024, 4096, 65536, 1024 * 1024, 8 * 1024 * 1024];

#[derive(Clone, Copy)]
enum Metric {
    Raw,
    Adjusted,
}

impl Metric {
    const fn as_str(self) -> &'static str {
        match self {
            Self::Raw => "raw",
            Self::Adjusted => "adjusted",
        }
    }
}

#[derive(Clone, Copy)]
enum Mode {
    Direct,
    Pool,
}

impl Mode {
    const fn as_str(self) -> &'static str {
        match self {
            Self::Direct => "direct",
            Self::Pool => "pool",
        }
    }
}

#[derive(Clone, Copy)]
enum Pattern {
    Lockstep,
    Staggered,
}

impl Pattern {
    const fn as_str(self) -> &'static str {
        match self {
            Self::Lockstep => "lockstep",
            Self::Staggered => "staggered",
        }
    }
}

#[derive(Clone, Copy)]
enum Threading {
    Single,
    Multi { threads: usize, pattern: Pattern },
}

impl Threading {
    const fn threads(self) -> usize {
        match self {
            Self::Single => 1,
            Self::Multi { threads, .. } => threads,
        }
    }
}

pub fn bench(c: &mut Criterion) {
    let page_size = page_size();
    let threads = std::thread::available_parallelism().map_or(MIN_BENCH_THREADS, |n| {
        n.get().clamp(MIN_BENCH_THREADS, MAX_BENCH_THREADS)
    });
    let threadings: &[Threading] = &[
        Threading::Single,
        Threading::Multi {
            threads,
            pattern: Pattern::Lockstep,
        },
        Threading::Multi {
            threads,
            pattern: Pattern::Staggered,
        },
    ];

    for &size in SIZES {
        let pool = build_pool(size, threads);
        let alignment = pool.config().alignment.get();

        for &threading in threadings {
            for metric in [Metric::Raw, Metric::Adjusted] {
                bench_case(
                    c,
                    Mode::Direct,
                    size,
                    threading,
                    metric,
                    || {
                        let mut buf =
                            IoBufMut::with_alignment(size, NonZeroUsize::new(alignment).unwrap());
                        touch_pages(buf.as_mut_ptr(), size, page_size);
                        buf
                    },
                    page_size,
                );
                bench_case(
                    c,
                    Mode::Pool,
                    size,
                    threading,
                    metric,
                    {
                        let pool = pool.clone();
                        move || {
                            let mut buf = pool
                                .try_alloc(size)
                                .expect("buffer pool exhausted during benchmark");
                            touch_pages(buf.as_mut_ptr(), size, page_size);
                            buf
                        }
                    },
                    page_size,
                );
            }
        }
    }
}

fn bench_case(
    c: &mut Criterion,
    mode: Mode,
    size: usize,
    threading: Threading,
    metric: Metric,
    work: impl Fn() -> IoBufMut + Sync,
    page_size: usize,
) {
    let name = bench_name(mode, metric, size, threading);
    c.bench_function(&name, |b| {
        b.iter_custom(|iters| {
            let full = measure(
                iters,
                threading,
                || {},
                |_| {
                    let buffer = black_box(work());
                    drop(buffer);
                },
            );

            if matches!(metric, Metric::Raw) {
                return full;
            }

            // Measure the cost of touching pages on a pre-allocated buffer.
            // Always single-threaded: each thread writes to private memory so
            // the per-iteration touch cost is the same regardless of thread
            // count, and single-threaded avoids wall-clock noise from thread
            // scheduling that would swamp the subtraction signal.
            let baseline = measure(iters, Threading::Single, &work, |buffer| {
                touch_pages(buffer.as_mut_ptr(), size, page_size)
            });

            full.saturating_sub(baseline)
        });
    });
}

/// Measure `iters` repetitions of `step`.
///
/// `setup` runs per-worker before timing starts and returns state passed to
/// each `step` invocation. For multi-threaded runs, all workers synchronize
/// via a barrier after setup so timing captures concurrent execution only.
fn measure<T>(
    iters: u64,
    threading: Threading,
    setup: impl Fn() -> T + Sync,
    step: impl Fn(&mut T) + Sync,
) -> Duration {
    let Threading::Multi { threads, pattern } = threading else {
        let mut state = setup();
        let start = Instant::now();
        for _ in 0..iters {
            step(&mut state);
        }
        return start.elapsed();
    };

    let start = thread::scope(|scope| {
        let ready = Arc::new(Barrier::new(threads + 1));
        let launch = Arc::new(Barrier::new(threads + 1));

        for thread_id in 0..threads {
            let ready = ready.clone();
            let launch = launch.clone();
            let setup = &setup;
            let step = &step;
            scope.spawn(move || {
                let mut state = setup();
                ready.wait();
                launch.wait();
                for iter in 0..iters {
                    step(&mut state);

                    if matches!(pattern, Pattern::Staggered) {
                        // Desynchronize threads so they don't all hit the
                        // allocator at once. This spreads access times apart
                        // without adding enough delay to dominate the
                        // measurement.
                        let spins = (iter as usize).wrapping_add(1).wrapping_mul(
                            thread_id
                                .wrapping_mul(MAX_BENCH_THREADS - 1)
                                .wrapping_add(1),
                        ) & 0xF;

                        for _ in 0..spins {
                            spin_loop();
                        }
                    }
                }
            });
        }

        ready.wait();
        let start = Instant::now();
        launch.wait();
        start
    });

    start.elapsed()
}

#[inline]
fn touch_pages(ptr: *mut u8, size: usize, page_size: usize) {
    if size == 0 {
        return;
    }

    // Force the allocation to back each page before timing the drop path.
    // Otherwise large aligned allocations can look artificially cheap when the
    // allocator hands out lazily materialized virtual memory (i.e. `mmap`).
    //
    // We vary the write offset within each page so that consecutive pages hit
    // different L1 cache sets. A naive page-strided write (offset 0 on every
    // page) maps all stores to the same set because the page size is an exact
    // multiple of the L1 set count times the cache line size, causing
    // pathological eviction.
    const CACHE_LINE: usize = 128;
    let lines_per_page = page_size / CACHE_LINE;

    // SAFETY: `ptr` is valid for writes to `size` bytes.
    unsafe {
        for (i, offset) in (0..size).step_by(page_size).enumerate() {
            let within_page = (i % lines_per_page) * CACHE_LINE;
            let pos = offset + within_page;
            ptr.add(pos.min(size - 1)).write_volatile(0);
        }
        ptr.add(size - 1).write_volatile(0);
    }
}

fn bench_name(mode: Mode, metric: Metric, size: usize, threading: Threading) -> String {
    let threads = threading.threads();
    let mut name = format!(
        "{}/mode={} size={size} threads={threads} metric={}",
        module_path!(),
        mode.as_str(),
        metric.as_str(),
    );
    if let Threading::Multi { pattern, .. } = threading {
        name.push_str(&format!(" pattern={}", pattern.as_str()));
    }
    name
}

fn build_pool(size: usize, threads: usize) -> BufferPool {
    let cfg = BufferPoolConfig::for_network()
        .with_pool_min_size(1024)
        .with_min_size(NZUsize!(size.max(1024)))
        .with_max_size(NZUsize!(size.max(1024)))
        .with_max_per_class(NZUsize!(threads * 4))
        .with_thread_cache_for_parallelism(NZUsize!(threads))
        .with_prefill(true);

    let runner_cfg = tokio::Config::default()
        .with_worker_threads(1)
        .with_network_buffer_pool_config(cfg);

    tokio::Runner::new(runner_cfg).start(|ctx| async move { ctx.network_buffer_pool().clone() })
}

#[allow(clippy::missing_const_for_fn)]
fn page_size() -> usize {
    #[cfg(unix)]
    {
        // SAFETY: sysconf is safe to call.
        let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
        if size <= 0 {
            4096
        } else {
            size as usize
        }
    }

    #[cfg(not(unix))]
    {
        4096
    }
}