nolock 0.2.3

A collection of Lock-Free Datastructures
Documentation
use std::{
    sync::{atomic, Arc},
    thread,
    time::{Duration, Instant},
};

use criterion::{black_box, Criterion, Throughput};

pub fn jiffy_enqueue_dequeue(ctx: &mut Criterion) {
    let mut group = ctx.benchmark_group("mpsc-jiffy");

    group.throughput(Throughput::Elements(2));

    group.bench_function("enqueue-dequeue", |b| {
        let (mut rx, tx) = nolock::queues::mpsc::jiffy::queue::<u64>();

        b.iter(|| {
            let _ = tx.enqueue(black_box(13));
            assert_eq!(Ok(13), rx.try_dequeue());
        });
    });
}

pub fn std_enqueue_dequeue(ctx: &mut Criterion) {
    let mut group = ctx.benchmark_group("mpsc-std");

    group.throughput(Throughput::Elements(2));

    group.bench_function("enqueue-dequeue", |b| {
        let (tx, rx) = std::sync::mpsc::channel::<u64>();

        b.iter(|| {
            let _ = tx.send(black_box(13));
            assert_eq!(Ok(13), rx.try_recv());
        });
    });
}

pub fn jiffy_concurrent_enqueue(ctx: &mut Criterion) {
    let mut group = ctx.benchmark_group("mpsc-jiffy-conc-enqueue");

    group.throughput(Throughput::Elements(1));

    fn bench_enqueues(iters: u64, thread_count: u64) -> Duration {
        let (rx, tx) = nolock::queues::mpsc::jiffy::queue();
        let a_tx = Arc::new(tx);
        let a_started = Arc::new(atomic::AtomicBool::new(false));

        let per_thread = iters / thread_count;

        let threads: Vec<_> = (0..thread_count)
            .map(|_| {
                let c_tx = a_tx.clone();
                let c_started = a_started.clone();
                thread::spawn(move || {
                    let mut inserted = 0;
                    while !c_started.load(atomic::Ordering::Acquire) {}

                    let started = Instant::now();
                    while inserted < per_thread {
                        c_tx.enqueue(13).unwrap();
                        inserted += 1;
                    }
                    started.elapsed()
                })
            })
            .collect();

        a_started.store(true, atomic::Ordering::Release);

        let mut total_time = Duration::from_nanos(0);
        for th in threads {
            let th_result = th.join().unwrap();
            total_time = total_time + th_result;
        }

        drop(rx);
        drop(a_tx);

        total_time / thread_count as u32
    }

    for threads in [1, 2, 4, 8, 16] {
        group.bench_function(threads.to_string(), |b| {
            b.iter_custom(|iters| bench_enqueues(iters, threads))
        });
    }
}

pub fn std_concurrent_enqueue(ctx: &mut Criterion) {
    let mut group = ctx.benchmark_group("mpsc-std-conc-enqueue");

    group.throughput(Throughput::Elements(1));

    fn bench_enqueues(iters: u64, thread_count: u64) -> Duration {
        let (tx, rx) = std::sync::mpsc::channel();
        let a_started = Arc::new(atomic::AtomicBool::new(false));

        let per_thread = iters / thread_count;

        let threads: Vec<_> = (0..thread_count)
            .map(|_| {
                let c_tx = tx.clone();
                let c_started = a_started.clone();
                thread::spawn(move || {
                    let mut inserted = 0;
                    while !c_started.load(atomic::Ordering::Acquire) {}

                    let started = Instant::now();
                    while inserted < per_thread {
                        c_tx.send(13).unwrap();
                        inserted += 1;
                    }
                    started.elapsed()
                })
            })
            .collect();

        a_started.store(true, atomic::Ordering::Release);

        let mut total_time = Duration::from_nanos(0);
        for th in threads {
            let th_result = th.join().unwrap();
            total_time = total_time + th_result;
        }

        drop(rx);

        total_time / thread_count as u32
    }

    for threads in [1, 2, 4, 8, 16] {
        group.bench_function(threads.to_string(), |b| {
            b.iter_custom(|iters| bench_enqueues(iters, threads))
        });
    }
}