crossbeam-channel 0.2.4

Multi-producer multi-consumer channels for message passing
Documentation
//! Tests borrowed from Go and ported to Rust.

extern crate crossbeam;
#[macro_use]
extern crate crossbeam_channel as channel;
extern crate parking_lot;

mod wrappers;

macro_rules! tests {
    ($channel:path) => {
        use std::any::Any;
        use std::collections::HashMap;
        use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
        use std::thread;
        use std::time::Duration;

        use $channel as channel;
        use crossbeam;
        use parking_lot::Mutex;

        fn ms(ms: u64) -> Duration {
            Duration::from_millis(ms)
        }

        // https://github.com/golang/go/blob/master/test/chan/doubleselect.go
        mod doubleselect {
            use super::*;

            const ITERATIONS: i32 = 100_000;

            #[test]
            fn test_double_select() {
                let (c1, r1) = channel::unbounded();
                let (c2, r2) = channel::unbounded();
                let (c3, r3) = channel::unbounded();
                let (c4, r4) = channel::unbounded();
                let (done_s, done_r) = channel::unbounded();
                let (mux_s, mux_r) = channel::unbounded();

                fn mux(out: channel::Sender<i32>, in_c: channel::Receiver<i32>, done: channel::Sender<bool>) {
                    for val in in_c.0 {
                        out.send(val);
                    }
                    
                    drop(out);
                    done.send(true);
                }

                crossbeam::scope(|scope| {
                    // This is akin to the sender function in the go example.
                    scope.spawn(|| {
                        for i in 0..ITERATIONS {
                            select! {
                                send(c1, i) => {},
                                send(c2, i) => {},
                                send(c3, i) => {},
                                send(c4, i) => {},
                            }
                        }

                        drop(c1);
                        drop(c2);
                        drop(c3);
                        drop(c4);
                    });

                    // This is akin to the mux function call.
                    {
                        let mux_s = mux_s.clone();
                        let done_s = done_s.clone();
                        scope.spawn(move || mux(mux_s, r1, done_s));
                    }
                    {
                        let mux_s = mux_s.clone();
                        let done_s = done_s.clone();
                        scope.spawn(move || mux(mux_s, r2, done_s));
                    }
                    {
                        let mux_s = mux_s.clone();
                        let done_s = done_s.clone();
                        scope.spawn(move || mux(mux_s, r3, done_s));
                    }
                    {
                        let mux_s = mux_s.clone();
                        let done_s = done_s.clone();
                        scope.spawn(move || mux(mux_s, r4, done_s));
                    }

                    scope.spawn(|| {
                        done_r.recv();
                        done_r.recv();
                        done_r.recv();
                        done_r.recv();

                        drop(mux_s);
                    });

                    // Akin to recver in the go example.
                    let mut seen = HashMap::new();

                    for val in mux_r.0 {
                        if seen.contains_key(&val) {
                            panic!("got duplicate value for {}", val);
                        }
                        seen.insert(val, true);
                    }
                });
            }
        }

        // https://github.com/golang/go/blob/master/test/chan/fifo.go
        mod fifo {
            use super::*;

            const N: i32 = 10;

            // AsynchFifo from the go example.
            #[test]
            fn async() {
                let (tx, rx) = channel::bounded(N as usize);
                for i in 0..N {
                    tx.send(i);
                }
                for i in 0..N {
                    if rx.recv() != Some(i) {
                        panic!("bad receive");
                    }
                }
            }

            fn chain(ch: channel::Receiver<i32>, val: i32, in_c: channel::Receiver<i32>, out: channel::Sender<i32>) {
                in_c.recv();
                if ch.recv() != Some(val) {
                    panic!(val);
                }
                out.send(1);
            }

            // SynchFifo from the go example.
            #[test]
            fn sync() {
                let (ch_s, ch_r) = channel::bounded(0);
                let (in_s, mut in_r) = channel::bounded(0);
                let start = in_s.clone();

                crossbeam::scope(|scope| {
                    for i in 0..N {
                        let (out_s, out_r) = channel::bounded(0);
                        let ch_r = ch_r.clone();
                        scope.spawn(move || {
                            chain(ch_r, i, in_r, out_s);
                        });
                        in_r = out_r;
                    }
                    start.send(0);
                    for i in 0..N {
                        ch_s.send(i);
                    }
                    in_r.recv();
                });
            }
        }

        // https://github.com/golang/go/blob/master/test/chan/nonblock.go
        mod nonblock {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select.go
        mod select {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select2.go
        mod select2 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select3.go
        mod select3 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select4.go
        mod select4 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select5.go
        mod select5 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select6.go
        mod select6 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/select7.go
        mod select7 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/sieve1.go
        mod sieve1 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/sieve2.go
        mod sieve2 {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/chan/zerosize.go
        mod zerosize {
            use super::*;

            #[test]
            fn zero_size_struct() {
                struct ZeroSize;
                let _ = channel::unbounded::<ZeroSize>();
            }

            #[test]
            fn zero_size_array() {
                let _ = channel::unbounded::<[u8; 0]>();
            }
        }

        // https://github.com/golang/go/blob/master/src/runtime/chan_test.go
        mod chan_test {
            use super::*;

            #[test]
            fn chan() {
                const N: usize = 200;

                for cap in 0..N {
                    {
                        let c = channel::bounded::<i32>(cap);
                        let recv1 = AtomicBool::new(false);
                        let recv2 = AtomicBool::new(false);

                        crossbeam::scope(|scope| {
                            scope.spawn(|| {
                                c.1.recv();
                                recv1.store(true, Ordering::SeqCst);
                            });
                            scope.spawn(|| {
                                c.1.recv();
                                recv2.store(true, Ordering::SeqCst);
                            });

                            thread::sleep(ms(1));

                            if recv1.load(Ordering::SeqCst) || recv2.load(Ordering::SeqCst) {
                                panic!();
                            }

                            // Ensure that non-blocking receive does not block.
                            select! {
                                recv(c.1) => panic!(),
                                default => {}
                            }
                            select! {
                                recv(c.1) => panic!(),
                                default => {}
                            }

                            c.0.send(0);
                            c.0.send(0);
                        });
                    }

                    {
                        // Ensure that send to full chan blocks.
                        let c = channel::bounded::<i32>(cap);
                        for i in 0..cap {
                            c.0.send(i as i32);
                        }
                        let sent = AtomicUsize::new(0);

                        crossbeam::scope(|scope| {
                            scope.spawn(|| {
                                c.0.send(0);
                                sent.store(1, Ordering::SeqCst);
                            });

                            thread::sleep(ms(1));

                            if sent.load(Ordering::SeqCst) != 0 {
                                panic!();
                            }

                            // Ensure that non-blocking send does not block.
                            select! {
                                send(c.0, 0) => panic!(),
                                default => {}
                            }
                            c.1.recv();
                        });
                    }

                    {
                        // Ensure that we receive 0 from closed chan.
                        let c = channel::bounded::<i32>(cap);
                        for i in 0..cap {
                            c.0.send(i as i32);
                        }
                        drop(c.0);

                        for i in 0..cap {
                            let v = c.1.recv();
                            assert_eq!(v, Some(i as i32));
                        }

                        assert_eq!(c.1.recv(), None);
                    }

                    {
                        // Ensure that close unblocks receive.
                        let (s, r) = channel::bounded::<i32>(cap);
                        let done = channel::bounded::<bool>(0);

                        crossbeam::scope(|scope| {
                            scope.spawn(|| done.0.send(r.recv() == None));
                            thread::sleep(ms(1));
                            drop(s);

                            assert_eq!(done.1.recv(), Some(true));
                        });
                    }

                    {
                        // Send 100 integers,
                        // ensure that we receive them non-corrupted in FIFO order.
                        let c = channel::bounded::<i32>(cap);
                        crossbeam::scope(|scope| {
                            scope.spawn(|| {
                                for i in 0..100 {
                                    c.0.send(i);
                                }
                            });

                            for i in 0..100 {
                                assert_eq!(c.1.recv(), Some(i));
                            }
                        });

                        // Same, but using recv2.
                        crossbeam::scope(|scope| {
                            scope.spawn(|| {
                                for i in 0..100 {
                                    c.0.send(i);
                                }
                            });

                            for i in 0..100 {
                                assert_eq!(c.1.recv(), Some(i));
                            }
                        });

                        // Send 1000 integers in 4 goroutines,
                        // ensure that we receive what we send.
                        const P: usize = 4;
                        const L: usize = 1000;
                        let done = channel::bounded::<Vec<i32>>(0);
                        crossbeam::scope(|scope| {
                            for _ in 0..P {
                                scope.spawn(|| {
                                    for i in 0..L {
                                        c.0.send(i as i32);
                                    }
                                });
                            }

                            for _ in 0..P {
                                scope.spawn(|| {
                                    let mut recv = vec![0; L];
                                    for _ in 0..L {
                                        let v = c.1.recv().unwrap();
                                        recv[v as usize] += 1;
                                    }
                                    done.0.send(recv);
                                });
                            }

                            let mut recv = vec![0; L];
                            for _ in 0..P {
                                for (i, v) in done.1.recv().unwrap().into_iter().enumerate() {
                                    recv[i] += v;
                                }
                            }

                            assert_eq!(recv.len(), L);
                            for v in recv {
                                assert_eq!(v, P as i32);
                            }
                        });
                    }

                    {
                        // Test len/cap.
                        let c = channel::bounded::<i32>(cap);

                        assert_eq!(c.0.len(), 0);
                        assert_eq!(c.0.capacity(), Some(cap));

                        for i in 0..cap {
                            c.0.send(i as i32);
                        }

                        assert_eq!(c.0.len(), cap);
                        assert_eq!(c.0.capacity(), Some(cap));
                    }
                }
            }

            #[test]
            fn nonblock_recv_race() {
                const N: usize = 10000;

                for _ in 0..N {
                    let (s, r) = channel::bounded(1);
                    s.send(1);

                    crossbeam::scope(|scope| {
                        scope.spawn(|| {
                            select! {
                                recv(r) => {}
                                default => panic!("chan is not ready"),
                            }
                        });

                        drop(s);
                        r.recv();
                    });
                }
            }

            #[test]
            fn nonblock_select_race() {
                const N: usize = 10000;

                let (done_s, done_r) = channel::bounded::<bool>(1);
                for _ in 0..N {
                    let (s1, r1) = channel::bounded::<i32>(1);
                    let (s2, r2) = channel::bounded::<i32>(1);
                    s1.send(1);

                    crossbeam::scope(|scope| {
                        scope.spawn(|| {
                            select! {
                                recv(r1) => {}
                                recv(r2) => {}
                                default => {
                                    done_s.send(false);
                                    return;
                                }
                            }
                            done_s.send(true);
                        });

                        s2.send(1);
                        select! {
                            recv(r1) => {}
                            default => {}
                        }

                        if !done_r.recv().unwrap() {
                            panic!("no chan is ready");
                        }
                    });
                }
            }

            #[test]
            fn nonblock_select_race2() {
                const N: usize = 1000;

                let (done_s, done_r) = channel::bounded::<bool>(1);
                for _ in 0..N {
                    let (s1, r1) = channel::bounded::<i32>(1);
                    let (s2, r2) = channel::bounded::<i32>(0);
                    s1.send(1);

                    crossbeam::scope(|scope| {
                        scope.spawn(|| {
                            select! {
                                recv(r1) => {}
                                recv(r2) => {}
                                default => {
                                    done_s.send(false);
                                    return;
                                }
                            }
                            done_s.send(true);
                        });

                        drop(s2);
                        select! {
                            recv(r1) => {}
                            default => {}
                        }

                        if !done_r.recv().unwrap() {
                            panic!("no chan is ready");
                        }
                    });
                }
            }

            #[test]
            fn self_select() {
                // Ensure that send/recv on the same chan in select
                // does not crash nor deadlock.
                for &cap in &[0, 10] {
                    let (s, r) = channel::bounded::<i32>(cap);

                    crossbeam::scope(|scope| {
                        for p in 0..2 {
                            let p = p;
                            let (s, r) = (&s, &r);
                            scope.spawn(move || {
                                for i in 0..1000 {
                                    if p == 0 || i % 2 == 0 {
                                        select! {
                                            send(s, p) => {}
                                            recv(r, v) => {
                                                if cap == 0 && v.unwrap() == p {
                                                    panic!("self receive");
                                                }
                                            }
                                        }
                                    } else {
                                        select! {
                                            recv(r, v) => {
                                                if cap == 0 && v.unwrap() == p {
                                                    panic!("self receive");
                                                }
                                            }
                                            send(s, p) => {}
                                        }
                                    }
                                }
                            });
                        }
                    });
                }
            }

            #[test]
            fn select_stress() {
                let c = vec![
                    channel::bounded(0),
                    channel::bounded(0),
                    channel::bounded(2),
                    channel::bounded(3),
                ];

                const N: usize = 10000;

                // There are 4 goroutines that send N values on each of the chans,
                // + 4 goroutines that receive N values on each of the chans,
                // + 1 goroutine that sends N values on each of the chans in a single select,
                // + 1 goroutine that receives N values on each of the chans in a single select.
                // All these sends, receives and selects interact chaotically at runtime,
                // but we are careful that this whole construct does not deadlock.
                crossbeam::scope(|scope| {
                    for k in 0..4 {
                        {
                            let c = c.clone();
                            let k = k;
                            scope.spawn(move || {
                                for _ in 0..N {
                                    c[k].0.send(0);
                                }
                            });
                        }
                        {
                            let c = c.clone();
                            let k = k;
                            scope.spawn(move || {
                                for _ in 0..N {
                                    c[k].1.recv();
                                }
                            });
                        }
                    }

                    {
                        let mut s = c.iter()
                            .map(|(s, _)| Some(s.clone()))
                            .collect::<Vec<_>>();

                        scope.spawn(move || {
                            let mut n = [0i32; 4];
                            for _ in 0..4 * N {
                                let i;
                                select! {
                                    send(s[3].iter().map(|x| &**x), 0) => i = 3,
                                    send(s[2].iter().map(|x| &**x), 0) => i = 2,
                                    send(s[0].iter().map(|x| &**x), 0) => i = 0,
                                    send(s[1].iter().map(|x| &**x), 0) => i = 1,
                                }
                                n[i] += 1;
                                assert!(n[i] <= N as i32);
                                if n[i] == N as i32 {
                                    s[i] = None;
                                }
                            }
                        });
                    }

                    {
                        let mut r = c.iter()
                            .map(|(_, r)| Some(r.clone()))
                            .collect::<Vec<_>>();

                        scope.spawn(move || {
                            let mut n = [0i32; 4];
                            for _ in 0..4 * N {
                                let i;
                                select! {
                                    recv(r[0].iter().map(|x| &**x)) => i = 0,
                                    recv(r[1].iter().map(|x| &**x)) => i = 1,
                                    recv(r[2].iter().map(|x| &**x)) => i = 2,
                                    recv(r[3].iter().map(|x| &**x)) => i = 3,
                                }
                                n[i] += 1;
                                assert!(n[i] <= N as i32);
                                if n[i] == N as i32 {
                                    r[i] = None;
                                }
                            }
                        });
                    }
                });
            }

            #[test]
            fn select_fairness() {
                const TRIALS: usize = 10000;

                let (s1, r1) = channel::bounded::<u8>(TRIALS + 1);
                let (s2, r2) = channel::bounded::<u8>(TRIALS + 1);

                for _ in 0..TRIALS + 1 {
                    s1.send(1);
                    s2.send(2);
                }

                let (_s3, r3) = channel::bounded::<u8>(TRIALS + 1);
                let (_s4, r4) = channel::bounded::<u8>(TRIALS + 1);
                let (out_s, out_r) = channel::bounded::<u8>(TRIALS + 1);
                let (done_s, done_r) = channel::bounded::<u8>(TRIALS + 1);

                crossbeam::scope(|scope| {
                    scope.spawn(|| {
                        loop {
                            let b = select! {
                                recv(r3, m) => m,
                                recv(r4, m) => m,
                                recv(r1, m) => m,
                                recv(r2, m) => m,
                            }.unwrap();

                            select! {
                                send(out_s, b) => {}
                                recv(done_r) => return
                            }
                        }
                    });

                    let (mut cnt1, mut cnt2) = (0, 0);
                    for _ in 0..TRIALS {
                        match out_r.recv() {
                            Some(1) => cnt1 += 1,
                            Some(2) => cnt2 += 1,
                            b => panic!("unexpected value {:?} on channel", b),
                        }
                    }

                    // If the select in the goroutine is fair,
                    // cnt1 and cnt2 should be about the same value.
                    // With 10,000 trials, the expected margin of error at
                    // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).

                    let r = cnt1 as f64 / TRIALS as f64;
                    let e = (r - 0.5).abs();

                    if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) {
                        panic!(
                            "unfair select: in {} trials, results were {}, {}",
                            TRIALS,
                            cnt1,
                            cnt2,
                        );
                    }

                    drop(done_s);
                });
            }

            #[test]
            fn chan_send_interface() {
                struct Mt;

                let (s, _r) = channel::bounded::<Box<Any>>(1);
                s.send(Box::new(Mt));

                select! {
                    send(s, Box::new(Mt)) => {}
                    default => {}
                }

                select! {
                    send(s, Box::new(Mt)) => {}
                    send(s, Box::new(Mt)) => {}
                    default => {}
                }
            }

            #[test]
            fn pseudo_random_send() {
                const N: usize = 100;

                for cap in 0..N {
                    let (s, r) = channel::bounded::<i32>(cap);
                    let l = Mutex::new(vec![0i32; N]);

                    crossbeam::scope(|scope| {
                        scope.spawn(|| {
                            let mut l = l.lock();
                            for i in 0..N {
                                thread::yield_now();
                                l[i] = r.recv().unwrap();
                            }
                        });

                        for _ in 0..N {
                            select! {
                                send(s, 1) => {}
                                send(s, 0) => {}
                            }
                        }

                        let l = l.lock();
                        let mut n0 = 0;
                        let mut n1 = 0;
                        for &i in l.iter() {
                            n0 += (i + 1) % 2;
                            n1 += i;
                        }

                        if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 {
                            panic!(
                                "Want pseudorandom, got {} zeros and {} ones (chan cap {})",
                                n0,
                                n1,
                                cap,
                            );
                        }
                    });
                }
            }

            #[test]
            fn multi_consumer() {
                const NWORK: usize = 23;
                const NITER: usize = 271828;

                let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];

                let (q_s, q_r) = channel::bounded::<i32>(NWORK * 3);
                let (r_s, r_r) = channel::bounded::<i32>(NWORK * 3);

                let expect = AtomicUsize::new(0);

                crossbeam::scope(|scope| {
                    // workers
                    for i in 0..NWORK {
                        let w = i;
                        let q_r = &q_r;
                        let pn = &pn;
                        let r_s = r_s.clone();
                        scope.spawn(move || {
                            for v in &q_r.0 {
                                // mess with the fifo-ish nature of range
                                if pn[w % pn.len()] == v {
                                    thread::yield_now();
                                }
                                r_s.send(v);
                            }
                        });
                    }

                    // feeder & closer
                    scope.spawn(|| {
                        for i in 0..NITER {
                            let v = pn[i % pn.len()];
                            expect.fetch_add(v as usize, Ordering::SeqCst);
                            q_s.send(v);
                        }

                        drop(q_s);
                        drop(r_s);
                    });

                    // consume & check
                    let mut n = 0;
                    let mut s = 0;
                    for v in &r_r.0 {
                        n += 1;
                        s += v;
                    }
                    if n != NITER || s != expect.load(Ordering::SeqCst) as i32 {
                        panic!(
                            "Expected sum {} (got {}) from {} iter (saw {})",
                            expect.load(Ordering::SeqCst),
                            s,
                            NITER,
                            n,
                        );
                    }
                });
            }

            #[test]
            fn select_duplicate_channel() {
                // This test makes sure we can queue a G on
                // the same channel multiple times.
                let (c_s, c_r) = channel::bounded::<i32>(0);
                let (d_s, d_r) = channel::bounded::<i32>(0);
                let (e_s, e_r) = channel::bounded::<i32>(0);

                crossbeam::scope(|scope| {
                    scope.spawn(|| {
                        select! {
                            recv(c_r) => {}
                            recv(c_r) => {}
                            recv(d_r) => {}
                        }
                        e_s.send(9);
                    });
                    thread::sleep(ms(1));

                    scope.spawn(|| {
                        c_r.recv();
                    });
                    thread::sleep(ms(1));

                    d_s.send(7);
                    e_r.recv();
                    c_s.send(8);
                });
            }
        }

        // https://github.com/golang/go/blob/master/test/closedchan.go
        mod closedchan {
            // TODO
        }

        // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go
        mod chanbarrier_test {
            // TODO
        }

        // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go
        mod race_chan_test {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/ken/chan.go
        mod chan {
            // TODO
        }

        // https://github.com/golang/go/blob/master/test/ken/chan1.go
        mod chan1 {
            // TODO
        }
    }
}

mod normal {
    tests!(wrappers::normal);
}

mod cloned {
    tests!(wrappers::cloned);
}

mod select {
    tests!(wrappers::select);
}

mod select_spin {
    tests!(wrappers::select_spin);
}

mod select_multi {
    tests!(wrappers::select_multi);
}