crossbeam-channel 0.1.1

Multi-producer multi-consumer channels for message passing
Documentation
extern crate crossbeam;
extern crate crossbeam_channel;

use std::any::Any;
use std::thread;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;

use crossbeam_channel::{bounded, unbounded, Receiver, Select, Sender};
use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};

fn ms(ms: u64) -> Duration {
    Duration::from_millis(ms)
}

#[test]
fn smoke1() {
    let mut iters = 0;
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    tx1.send(1).unwrap();

    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(v) = sel.recv(&rx1) {
            assert_eq!(v, 1);
            break;
        }
        if let Ok(_) = sel.recv(&rx2) {
            panic!();
        }
    }

    tx2.send(2).unwrap();

    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx1) {
            panic!();
        }
        if let Ok(v) = sel.recv(&rx2) {
            assert_eq!(v, 2);
            break;
        }
    }

    assert!(iters < 50);
}

#[test]
fn smoke2() {
    let mut iters = 0;
    let (_tx1, rx1) = unbounded::<i32>();
    let (_tx2, rx2) = unbounded::<i32>();
    let (_tx3, rx3) = unbounded::<i32>();
    let (_tx4, rx4) = unbounded::<i32>();
    let (tx5, rx5) = unbounded::<i32>();

    tx5.send(5).unwrap();

    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx1) {
            panic!();
        }
        if let Ok(_) = sel.recv(&rx2) {
            panic!();
        }
        if let Ok(_) = sel.recv(&rx3) {
            panic!();
        }
        if let Ok(_) = sel.recv(&rx4) {
            panic!();
        }
        if let Ok(x) = sel.recv(&rx5) {
            assert_eq!(x, 5);
            break;
        }
    }

    assert!(iters < 50);
}

#[test]
fn disconnected() {
    let mut iters = 0;
    let (tx1, rx1) = unbounded::<i32>();
    let (tx2, rx2) = unbounded::<i32>();

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(500));
            drop(tx1);
        });

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                panic!();
            }
            if let Ok(_) = sel.recv(&rx2) {
                panic!();
            }
            if sel.disconnected() {
                panic!();
            }
            if sel.timed_out() {
                break;
            }
        }
    });

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(500));
            drop(tx2);
        });

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                panic!();
            }
            if let Ok(_) = sel.recv(&rx2) {
                panic!();
            }
            if sel.disconnected() {
                break;
            }
            if sel.timed_out() {
                panic!();
            }
        }
    });

    assert!(iters < 50);
}

#[test]
fn would_block() {
    let mut iters = 0;
    let (tx1, rx1) = unbounded::<i32>();
    let (tx2, rx2) = unbounded::<i32>();

    drop(tx1);

    let mut sel = Select::with_timeout(ms(0));
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx1) {
            panic!();
        }
        if let Ok(_) = sel.recv(&rx2) {
            panic!();
        }
        if sel.disconnected() {
            panic!();
        }
        if sel.would_block() {
            break;
        }
        if sel.timed_out() {
            panic!();
        }
    }

    tx2.send(2).unwrap();

    let mut sel = Select::with_timeout(ms(0));
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx1) {
            panic!();
        }
        if let Ok(x) = sel.recv(&rx2) {
            assert_eq!(x, 2);
            break;
        }
        if sel.disconnected() {
            panic!();
        }
        if sel.would_block() {
            panic!();
        }
        if sel.timed_out() {
            panic!();
        }
    }

    drop(tx2);

    let mut sel = Select::with_timeout(ms(0));
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx1) {
            panic!();
        }
        if let Ok(_) = sel.recv(&rx2) {
            panic!();
        }
        if sel.disconnected() {
            break;
        }
        if sel.would_block() {
            panic!();
        }
        if sel.timed_out() {
            panic!();
        }
    }

    assert!(iters < 50);
}

#[test]
fn timeout() {
    let mut iters = 0;
    let (_tx1, rx1) = unbounded::<i32>();
    let (tx2, rx2) = unbounded::<i32>();

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(1500));
            tx2.send(2).unwrap();
        });

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                panic!();
            }
            if let Ok(_) = sel.recv(&rx2) {
                panic!();
            }
            if sel.timed_out() {
                break;
            }
        }

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                panic!();
            }
            if let Ok(x) = sel.recv(&rx2) {
                assert_eq!(x, 2);
                break;
            }
            if sel.timed_out() {
                panic!();
            }
        }
    });

    assert!(iters < 50);
}

#[test]
fn unblocks() {
    let mut iters = 0;
    let (tx1, rx1) = bounded(0);
    let (tx2, rx2) = bounded(0);

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(500));
            tx2.send(2).unwrap();
        });

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                panic!();
            }
            if let Ok(x) = sel.recv(&rx2) {
                assert_eq!(x, 2);
                break;
            }
            if sel.timed_out() {
                panic!();
            }
        }
    });

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(500));
            assert_eq!(rx1.recv().unwrap(), 1);
        });

        let mut sel = Select::with_timeout(ms(1000));
        loop {
            iters += 1;
            if let Ok(()) = sel.send(&tx1, 1) {
                break;
            }
            if let Ok(()) = sel.send(&tx2, 2) {
                panic!();
            }
            if sel.timed_out() {
                panic!();
            }
        }
    });

    assert!(iters < 50);
}

#[test]
fn both_ready() {
    let mut iters = 0;
    let (tx1, rx1) = bounded(0);
    let (tx2, rx2) = bounded(0);

    crossbeam::scope(|s| {
        s.spawn(|| {
            thread::sleep(ms(500));
            tx1.send(1).unwrap();
            assert_eq!(rx2.recv().unwrap(), 2);
        });

        for _ in 0..2 {
            let mut sel = Select::new();
            loop {
                iters += 1;
                if let Ok(x) = sel.recv(&rx1) {
                    assert_eq!(x, 1);
                    break;
                }
                if let Ok(()) = sel.send(&tx2, 2) {
                    break;
                }
            }
        }
    });

    assert!(iters < 50);
}

#[test]
fn no_starvation() {
    const N: usize = 10;

    let done_rx = &(0..N).map(|_| AtomicBool::new(false)).collect::<Vec<_>>();
    let done_tx = &(0..N).map(|_| AtomicBool::new(false)).collect::<Vec<_>>();

    while !done_rx.iter().all(|x| x.load(SeqCst)) || !done_tx.iter().all(|x| x.load(SeqCst)) {
        crossbeam::scope(|s| {
            let rxs = (0..N)
                .map(|i| {
                    let (tx, rx) = unbounded();
                    tx.send(i).unwrap();
                    rx
                })
                .collect::<Vec<_>>();

            let txs = (0..N)
                .map(|i| {
                    let (tx, rx) = bounded(100);
                    s.spawn(move || {
                        if let Ok(x) = rx.recv() {
                            assert_eq!(x, i);
                            done_tx[i].store(true, SeqCst);
                        }
                    });
                    tx
                })
                .collect::<Vec<_>>();

            let mut iters = 0;

            let mut sel = Select::new();
            'select: loop {
                iters += 1;

                for rx in &rxs {
                    if let Ok(x) = sel.recv(&rx) {
                        done_rx[x].store(true, SeqCst);
                        break 'select;
                    }
                }

                for (i, tx) in txs.iter().enumerate() {
                    if let Ok(()) = sel.send(&tx, i) {
                        break 'select;
                    }
                }
            }

            assert!(iters < 50);
        });
    }
}

#[test]
fn loop_try() {
    for _ in 0..20 {
        let (tx1, rx1) = bounded::<i32>(0);
        let (tx2, rx2) = bounded::<i32>(0);

        crossbeam::scope(|s| {
            s.spawn(|| {
                loop {
                    match tx1.try_send(1) {
                        Ok(()) => break,
                        Err(TrySendError::Disconnected(_)) => break,
                        Err(TrySendError::Full(_)) => continue,
                    }
                }
            });

            s.spawn(|| {
                loop {
                    match rx2.try_recv() {
                        Ok(x) => {
                            assert_eq!(x, 2);
                            break;
                        }
                        Err(TryRecvError::Disconnected) => break,
                        Err(TryRecvError::Empty) => continue,
                    }
                }
            });

            s.spawn(|| {
                let mut iters = 0;
                thread::sleep(ms(500));

                let mut sel = Select::with_timeout(ms(500));
                loop {
                    iters += 1;
                    if let Ok(x) = sel.recv(&rx1) {
                        assert_eq!(x, 1);
                        break;
                    }
                    if let Ok(_) = sel.send(&tx2, 2) {
                        break;
                    }
                    if sel.disconnected() {
                        panic!();
                    }
                    if sel.timed_out() {
                        panic!();
                    }
                }

                drop(rx1);
                drop(tx2);
                assert!(iters < 1000);
            });
        });
    }
}

#[test]
fn cloning1() {
    crossbeam::scope(|s| {
        let mut iters = 0;
        let (tx1, rx1) = unbounded::<i32>();
        let (_tx2, rx2) = unbounded::<i32>();
        let (tx3, rx3) = unbounded::<()>();

        s.spawn(move || {
            rx3.recv().unwrap();
            tx1.clone();
            assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
            tx1.send(1).unwrap();
            rx3.recv().unwrap();
        });

        tx3.send(()).unwrap();

        let mut sel = Select::new();
        loop {
            iters += 1;
            if let Ok(_) = sel.recv(&rx1) {
                break;
            }
            if let Ok(_) = sel.recv(&rx2) {
                panic!();
            }
        }

        tx3.send(()).unwrap();
        assert!(iters < 50);
    });
}

#[test]
fn cloning2() {
    crossbeam::scope(|s| {
        let mut iters = 0;
        let (tx1, rx1) = unbounded::<()>();
        let (tx2, rx2) = unbounded::<()>();
        let (_tx3, _rx3) = unbounded::<()>();

        s.spawn(move || {
            let mut sel = Select::new();
            loop {
                iters += 1;
                if let Ok(_) = sel.recv(&rx1) {
                    panic!();
                }
                if let Ok(_) = sel.recv(&rx2) {
                    break;
                }
            }
        });

        thread::sleep(ms(500));
        drop(tx1.clone());
        tx2.send(()).unwrap();

        assert!(iters < 50);
    })
}

#[test]
fn preflight1() {
    let (tx, rx) = unbounded();
    tx.send(()).unwrap();

    let mut iters = 0;
    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx) {
            break;
        }
    }
    assert!(iters < 10);
}

#[test]
fn preflight2() {
    let (tx, rx) = unbounded();
    drop(tx.clone());
    tx.send(()).unwrap();
    drop(tx);

    let mut iters = 0;
    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx) {
            break;
        }
    }
    assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
    assert!(iters < 10);
}

#[test]
fn preflight3() {
    let (tx, rx) = unbounded();
    drop(tx.clone());
    tx.send(()).unwrap();
    drop(tx);
    rx.recv().unwrap();

    let mut iters = 0;
    let mut sel = Select::new();
    loop {
        iters += 1;
        if let Ok(_) = sel.recv(&rx) {
            panic!();
        }
        if sel.disconnected() {
            break;
        }
    }
    assert!(iters < 10);
}

#[test]
fn stress_recv() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = bounded(5);
    let (tx3, rx3) = bounded(100);

    crossbeam::scope(|s| {
        s.spawn(|| {
            for i in 0..10_000 {
                tx1.send(i).unwrap();
                rx3.recv().unwrap();

                tx2.send(i).unwrap();
                rx3.recv().unwrap();
            }
        });

        for i in 0..10_000 {
            let mut iters = 0;

            for _ in 0..2 {
                let mut sel = Select::new();
                loop {
                    iters += 1;
                    if let Ok(x) = sel.recv(&rx1) {
                        assert_eq!(x, i);
                        break;
                    }
                    if let Ok(x) = sel.recv(&rx2) {
                        assert_eq!(x, i);
                        break;
                    }
                }

                tx3.send(()).unwrap();
            }

            assert!(iters < 50);
        }
    });
}

#[test]
fn stress_send() {
    let (tx1, rx1) = bounded(0);
    let (tx2, rx2) = bounded(0);
    let (tx3, rx3) = bounded(100);

    crossbeam::scope(|s| {
        s.spawn(|| {
            for i in 0..10_000 {
                assert_eq!(rx1.recv().unwrap(), i);
                assert_eq!(rx2.recv().unwrap(), i);
                rx3.recv().unwrap();
            }
        });

        for i in 0..10_000 {
            let mut iters = 0;

            for _ in 0..2 {
                let mut sel = Select::new();
                loop {
                    iters += 1;
                    if let Ok(()) = sel.send(&tx1, i) {
                        break;
                    }
                    if let Ok(()) = sel.send(&tx2, i) {
                        break;
                    }
                }
            }
            tx3.send(()).unwrap();

            assert!(iters < 50);
        }
    });
}

#[test]
fn stress_mixed() {
    let (tx1, rx1) = bounded(0);
    let (tx2, rx2) = bounded(0);
    let (tx3, rx3) = bounded(100);

    crossbeam::scope(|s| {
        s.spawn(|| {
            for i in 0..10_000 {
                tx1.send(i).unwrap();
                assert_eq!(rx2.recv().unwrap(), i);
                rx3.recv().unwrap();
            }
        });

        for i in 0..10_000 {
            let mut iters = 0;

            for _ in 0..2 {
                let mut sel = Select::new();
                loop {
                    iters += 1;
                    if let Ok(x) = sel.recv(&rx1) {
                        assert_eq!(x, i);
                        break;
                    }
                    if let Ok(()) = sel.send(&tx2, i) {
                        break;
                    }
                }
            }
            tx3.send(()).unwrap();

            assert!(iters < 50);
        }
    });
}

#[test]
fn stress_timeout_two_threads() {
    const COUNT: usize = 20;

    let (tx, rx) = bounded(2);

    crossbeam::scope(|s| {
        s.spawn(|| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let mut sel = Select::with_timeout(ms(100));
                'outer: loop {
                    loop {
                        if let Ok(()) = sel.send(&tx, i) {
                            break 'outer;
                        }
                        if sel.timed_out() {
                            break;
                        }
                    }
                }
            }
        });

        s.spawn(|| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let mut sel = Select::with_timeout(ms(100));
                'outer: loop {
                    loop {
                        if let Ok(x) = sel.recv(&rx) {
                            assert_eq!(x, i);
                            break 'outer;
                        }
                        if sel.timed_out() {
                            break;
                        }
                    }
                }
            }
        });
    });
}

struct WrappedSender<T>(Sender<T>);

impl<T> WrappedSender<T> {
    pub fn try_send(&self, mut value: T) -> Result<(), TrySendError<T>> {
        let mut iters = 0;
        let mut sel = Select::new();
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Err(err) = sel.send(&self.0, value) {
                value = err.into_inner();
            } else {
                return Ok(());
            }
            if sel.disconnected() {
                return Err(TrySendError::Disconnected(value));
            }
            if sel.would_block() {
                return Err(TrySendError::Full(value));
            }
        }
    }

    pub fn send(&self, mut value: T) -> Result<(), SendError<T>> {
        let mut iters = 0;
        let mut sel = Select::new();
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Err(err) = sel.send(&self.0, value) {
                value = err.into_inner();
            } else {
                return Ok(());
            }
            if sel.disconnected() {
                return Err(SendError(value));
            }
        }
    }

    pub fn send_timeout(&self, mut value: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
        let mut iters = 0;
        let mut sel = Select::with_timeout(timeout);
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Err(err) = sel.send(&self.0, value) {
                value = err.into_inner();
            } else {
                return Ok(());
            }
            if sel.disconnected() {
                return Err(SendTimeoutError::Disconnected(value));
            }
            if sel.timed_out() {
                return Err(SendTimeoutError::Timeout(value));
            }
        }
    }
}

struct WrappedReceiver<T>(Receiver<T>);

impl<T> WrappedReceiver<T> {
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        let mut iters = 0;
        let mut sel = Select::new();
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Ok(v) = sel.recv(&self.0) {
                return Ok(v);
            }
            if sel.disconnected() {
                return Err(TryRecvError::Disconnected);
            }
            if sel.would_block() {
                return Err(TryRecvError::Empty);
            }
        }
    }

    pub fn recv(&self) -> Result<T, RecvError> {
        let mut iters = 0;
        let mut sel = Select::new();
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Ok(v) = sel.recv(&self.0) {
                return Ok(v);
            }
            if sel.disconnected() {
                return Err(RecvError);
            }
        }
    }

    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
        let mut iters = 0;
        let mut sel = Select::with_timeout(timeout);
        loop {
            iters += 1;
            assert!(iters < 20);

            if let Ok(v) = sel.recv(&self.0) {
                return Ok(v);
            }
            if sel.disconnected() {
                return Err(RecvTimeoutError::Disconnected);
            }
            if sel.timed_out() {
                return Err(RecvTimeoutError::Timeout);
            }
        }
    }
}

#[test]
fn recv() {
    let (tx, rx) = bounded(100);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(rx.recv(), Ok(7));
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(8));
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(9));
            assert_eq!(rx.recv(), Err(RecvError));
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(tx.send(7), Ok(()));
            assert_eq!(tx.send(8), Ok(()));
            assert_eq!(tx.send(9), Ok(()));
        });
    });

    let (tx, rx) = bounded(0);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(rx.recv(), Ok(7));
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(8));
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(9));
            assert_eq!(rx.recv(), Err(RecvError));
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(tx.send(7), Ok(()));
            assert_eq!(tx.send(8), Ok(()));
            assert_eq!(tx.send(9), Ok(()));
        });
    });
}

#[test]
fn recv_timeout() {
    let (tx, rx) = bounded(100);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(rx.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
            assert_eq!(rx.recv_timeout(ms(1000)), Ok(7));
            assert_eq!(
                rx.recv_timeout(ms(1000)),
                Err(RecvTimeoutError::Disconnected)
            );
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(tx.send(7), Ok(()));
        });
    });
}

#[test]
fn try_recv() {
    let (tx, rx) = bounded(100);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
            thread::sleep(ms(1500));
            assert_eq!(rx.try_recv(), Ok(7));
            thread::sleep(ms(500));
            assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
        });
        s.spawn(move || {
            thread::sleep(ms(1000));
            assert_eq!(tx.send(7), Ok(()));
        });
    });

    let (tx, rx) = bounded(0);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(rx.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
            assert_eq!(rx.recv_timeout(ms(1000)), Ok(7));
            assert_eq!(
                rx.recv_timeout(ms(1000)),
                Err(RecvTimeoutError::Disconnected)
            );
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(tx.send(7), Ok(()));
        });
    });
}

#[test]
fn send() {
    let (tx, rx) = bounded(1);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(tx.send(7), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(8), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(9), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(10), Err(SendError(10)));
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(rx.recv(), Ok(7));
            assert_eq!(rx.recv(), Ok(8));
            assert_eq!(rx.recv(), Ok(9));
        });
    });

    let (tx, rx) = bounded(0);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(tx.send(7), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(8), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(9), Ok(()));
            assert_eq!(tx.send(10), Err(SendError(10)));
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(rx.recv(), Ok(7));
            assert_eq!(rx.recv(), Ok(8));
            assert_eq!(rx.recv(), Ok(9));
        });
    });
}

#[test]
fn send_timeout() {
    let (tx, rx) = bounded(2);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(tx.send_timeout(1, ms(1000)), Ok(()));
            assert_eq!(tx.send_timeout(2, ms(1000)), Ok(()));
            assert_eq!(
                tx.send_timeout(3, ms(500)),
                Err(SendTimeoutError::Timeout(3))
            );
            thread::sleep(ms(1000));
            assert_eq!(tx.send_timeout(4, ms(1000)), Ok(()));
            thread::sleep(ms(1000));
            assert_eq!(tx.send(5), Err(SendError(5)));
        });
        s.spawn(move || {
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(1));
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(2));
            assert_eq!(rx.recv(), Ok(4));
        });
    });

    let (tx, rx) = bounded(0);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(
                tx.send_timeout(7, ms(1000)),
                Err(SendTimeoutError::Timeout(7))
            );
            assert_eq!(tx.send_timeout(8, ms(1000)), Ok(()));
            assert_eq!(
                tx.send_timeout(9, ms(1000)),
                Err(SendTimeoutError::Disconnected(9))
            );
        });
        s.spawn(move || {
            thread::sleep(ms(1500));
            assert_eq!(rx.recv(), Ok(8));
        });
    });
}

#[test]
fn try_send() {
    let (tx, rx) = bounded(1);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(tx.try_send(1), Ok(()));
            assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
            thread::sleep(ms(1500));
            assert_eq!(tx.try_send(3), Ok(()));
            thread::sleep(ms(500));
            assert_eq!(tx.try_send(4), Err(TrySendError::Disconnected(4)));
        });
        s.spawn(move || {
            thread::sleep(ms(1000));
            assert_eq!(rx.try_recv(), Ok(1));
            assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
            assert_eq!(rx.recv(), Ok(3));
        });
    });

    let (tx, rx) = bounded(0);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    crossbeam::scope(|s| {
        s.spawn(move || {
            assert_eq!(tx.try_send(7), Err(TrySendError::Full(7)));
            thread::sleep(ms(1500));
            assert_eq!(tx.try_send(8), Ok(()));
            thread::sleep(ms(500));
            assert_eq!(tx.try_send(9), Err(TrySendError::Disconnected(9)));
        });
        s.spawn(move || {
            thread::sleep(ms(1000));
            assert_eq!(rx.recv(), Ok(8));
        });
    });
}

#[test]
fn recv_after_close() {
    let (tx, rx) = bounded(100);
    let tx = WrappedSender(tx);
    let rx = WrappedReceiver(rx);

    tx.send(1).unwrap();
    tx.send(2).unwrap();
    tx.send(3).unwrap();

    drop(tx);

    assert_eq!(rx.recv(), Ok(1));
    assert_eq!(rx.recv(), Ok(2));
    assert_eq!(rx.recv(), Ok(3));
    assert_eq!(rx.recv(), Err(RecvError));
}

#[test]
fn matching() {
    let (tx, rx) = bounded(0);
    let (tx, rx) = (&tx, &rx);

    crossbeam::scope(|s| {
        for i in 0..44 {
            s.spawn(move || {
                let mut sel = Select::new();
                loop {
                    if let Ok(x) = sel.recv(rx) {
                        assert_ne!(i, x);
                        break;
                    }
                    if let Ok(()) = sel.send(tx, i) {
                        break;
                    }
                }
            });
        }
    });

    assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn matching_with_leftover() {
    let (tx, rx) = bounded(0);
    let (tx, rx) = (&tx, &rx);

    crossbeam::scope(|s| {
        for i in 0..55 {
            s.spawn(move || {
                let mut sel = Select::new();
                loop {
                    if let Ok(x) = sel.recv(&rx) {
                        assert_ne!(i, x);
                        break;
                    }
                    if let Ok(()) = sel.send(&tx, i) {
                        break;
                    }
                }
            });
        }
        tx.send(!0).unwrap();
    });

    assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn channel_through_channel() {
    const COUNT: usize = 1000;

    type T = Box<Any + Send>;

    for cap in 0..3 {
        let (tx, rx) = bounded::<T>(cap);

        crossbeam::scope(|s| {
            s.spawn(move || {
                let mut tx = tx;

                for _ in 0..COUNT {
                    let (new_tx, new_rx) = bounded(cap);
                    let mut new_rx: T = Box::new(Some(new_rx));

                    let mut sel = Select::new();
                    loop {
                        if let Err(err) = sel.send(&tx, new_rx) {
                            new_rx = err.into_inner();
                        } else {
                            break;
                        }
                    }

                    tx = new_tx;
                }
            });

            s.spawn(move || {
                let mut rx = rx;

                for _ in 0..COUNT {
                    let mut sel = Select::new();
                    loop {
                        if let Ok(mut r) = sel.recv(&rx) {
                            rx = r.downcast_mut::<Option<Receiver<T>>>()
                                .unwrap()
                                .take()
                                .unwrap();
                            break;
                        }
                    }
                }
            });
        });
    }
}