hyperbridge 0.2.5

Fast multi-producer multi-consumer channel with async support
Documentation
#[macro_use]
extern crate criterion;

use criterion::{Bencher, Criterion};
use hyperbridge::channel;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::thread;

const VALUES: usize = 10000;
const THREADS: usize = 16;

fn test_hyperbridge_mpsc(b: &mut Bencher) {
    b.iter(|| {
        let (sender, receiver) = channel::new();
        let mut counter = 0;

        let mut handles = vec![];

        for i in 0..THREADS {
            let ch = sender.clone();
            let jh = thread::spawn(move || {
                for _ in 0..VALUES {
                    ch.send(i).unwrap();
                }
            });
            handles.push(jh);
        }

        let mut iters = THREADS * VALUES;

        while iters > 0 {
            match receiver.try_recv() {
                Ok(Some(v)) => {
                    counter += v as usize;
                    iters -= 1;
                }
                _ => {}
            }
        }

        let total: usize = (0..THREADS).map(|i| i * VALUES).sum();

        for jh in handles.drain(..) {
            let _ = jh.join();
        }

        assert_eq!(counter, total);
    });
}

fn test_crossbeam_mpsc(b: &mut Bencher) {
    b.iter(|| {
        let (sender, receiver) = crossbeam::channel::unbounded();
        let mut counter = 0;

        let mut handles = vec![];

        for i in 0..THREADS {
            let ch = sender.clone();
            let jh = thread::spawn(move || {
                for _ in 0..VALUES {
                    ch.send(i).unwrap();
                }
            });
            handles.push(jh);
        }

        let mut iters = THREADS * VALUES;

        while iters > 0 {
            match receiver.try_recv() {
                Ok(v) => {
                    counter += v as usize;
                    iters -= 1;
                }
                _ => {}
            }
        }

        let total: usize = (0..THREADS).map(|i| i * VALUES).sum();

        for jh in handles.drain(..) {
            let _ = jh.join();
        }

        assert_eq!(counter, total);
    });
}

fn test_hyperbridge_mpmc(b: &mut Bencher) {
    b.iter(|| {
        let (sender, receiver) = channel::new();
        let counter = Arc::new(AtomicUsize::new(0));

        let mut handles = vec![];

        for i in 0..THREADS {
            let ch = sender.clone();
            let jh = thread::spawn(move || {
                for _ in 0..VALUES {
                    ch.send(i).unwrap();
                }
            });
            handles.push(jh);
        }

        for _ in 0..THREADS {
            let ch = receiver.clone();
            let local_counter = counter.clone();
            let jh = thread::spawn(move || {
                let mut iters = VALUES;
                while iters > 0 {
                    if let Ok(Some(v)) = ch.try_recv() {
                        local_counter.fetch_add(v as usize, Relaxed);
                        iters -= 1;
                    }
                }
            });
            handles.push(jh);
        }

        for jh in handles.drain(..) {
            let _ = jh.join();
        }

        let total: usize = (0..THREADS).map(|i| i * VALUES).sum();

        assert_eq!(counter.load(Relaxed), total);
    });
}

fn test_crossbeam_mpmc(b: &mut Bencher) {
    b.iter(|| {
        let (sender, receiver) = crossbeam::channel::unbounded();
        let counter = Arc::new(AtomicUsize::new(0));

        let mut handles = vec![];

        for i in 0..THREADS {
            let ch = sender.clone();
            let jh = thread::spawn(move || {
                for _ in 0..VALUES {
                    ch.send(i).unwrap();
                }
            });
            handles.push(jh);
        }

        for _ in 0..THREADS {
            let ch = receiver.clone();
            let local_counter = counter.clone();
            let jh = thread::spawn(move || {
                let mut iters = VALUES;
                while iters > 0 {
                    if let Ok(v) = ch.try_recv() {
                        local_counter.fetch_add(v as usize, Relaxed);
                        iters -= 1;
                    }
                }
            });
            handles.push(jh);
        }

        for jh in handles.drain(..) {
            let _ = jh.join();
        }

        let total: usize = (0..THREADS).map(|i| i * VALUES).sum();

        assert_eq!(counter.load(Relaxed), total);
    });
}

fn hyperbridge_channel(b: &mut Criterion) {
    b.bench_function("hyperbridge-mpsc", |b| test_hyperbridge_mpsc(b));
    b.bench_function("hyperbridge-mpmc", |b| test_hyperbridge_mpmc(b));
}

fn crossbeam_channel(b: &mut Criterion) {
    b.bench_function("crossbeam-mpsc", |b| test_crossbeam_mpsc(b));
    b.bench_function("crossbeam-mpmc", |b| test_crossbeam_mpmc(b));
}

criterion_group!(compare, hyperbridge_channel, crossbeam_channel);
criterion_main!(compare);