crossbeam-channel 0.3.9

Multi-producer multi-consumer channels for message passing
Documentation
//! Tests for channel readiness using the `Select` struct.

extern crate crossbeam_channel;
extern crate crossbeam_utils;

use std::any::Any;
use std::cell::Cell;
use std::thread;
use std::time::{Duration, Instant};

use crossbeam_channel::{after, bounded, tick, unbounded};
use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
use crossbeam_utils::thread::scope;

fn ms(ms: u64) -> Duration {
    Duration::from_millis(ms)
}

#[test]
fn smoke1() {
    let (s1, r1) = unbounded::<usize>();
    let (s2, r2) = unbounded::<usize>();

    s1.send(1).unwrap();

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    assert_eq!(sel.ready(), 0);
    assert_eq!(r1.try_recv(), Ok(1));

    s2.send(2).unwrap();

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    assert_eq!(sel.ready(), 1);
    assert_eq!(r2.try_recv(), Ok(2));
}

#[test]
fn smoke2() {
    let (_s1, r1) = unbounded::<i32>();
    let (_s2, r2) = unbounded::<i32>();
    let (_s3, r3) = unbounded::<i32>();
    let (_s4, r4) = unbounded::<i32>();
    let (s5, r5) = unbounded::<i32>();

    s5.send(5).unwrap();

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    sel.recv(&r3);
    sel.recv(&r4);
    sel.recv(&r5);
    assert_eq!(sel.ready(), 4);
    assert_eq!(r5.try_recv(), Ok(5));
}

#[test]
fn disconnected() {
    let (s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    scope(|scope| {
        scope.spawn(|_| {
            drop(s1);
            thread::sleep(ms(500));
            s2.send(5).unwrap();
        });

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        match sel.ready_timeout(ms(1000)) {
            Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
            _ => panic!(),
        }

        r2.recv().unwrap();
    })
    .unwrap();

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    match sel.ready_timeout(ms(1000)) {
        Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
        _ => panic!(),
    }

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            drop(s2);
        });

        let mut sel = Select::new();
        sel.recv(&r2);
        match sel.ready_timeout(ms(1000)) {
            Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
            _ => panic!(),
        }
    })
    .unwrap();
}

#[test]
fn default() {
    let (s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    assert!(sel.try_ready().is_err());

    drop(s1);

    let mut sel = Select::new();
    sel.recv(&r1);
    sel.recv(&r2);
    match sel.try_ready() {
        Ok(0) => assert!(r1.try_recv().is_err()),
        _ => panic!(),
    }

    s2.send(2).unwrap();

    let mut sel = Select::new();
    sel.recv(&r2);
    match sel.try_ready() {
        Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
        _ => panic!(),
    }

    let mut sel = Select::new();
    sel.recv(&r2);
    assert!(sel.try_ready().is_err());

    let mut sel = Select::new();
    assert!(sel.try_ready().is_err());
}

#[test]
fn timeout() {
    let (_s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(1500));
            s2.send(2).unwrap();
        });

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        assert!(sel.ready_timeout(ms(1000)).is_err());

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        match sel.ready_timeout(ms(1000)) {
            Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
            _ => panic!(),
        }
    })
    .unwrap();

    scope(|scope| {
        let (s, r) = unbounded::<i32>();

        scope.spawn(move |_| {
            thread::sleep(ms(500));
            drop(s);
        });

        let mut sel = Select::new();
        assert!(sel.ready_timeout(ms(1000)).is_err());

        let mut sel = Select::new();
        sel.recv(&r);
        match sel.try_ready() {
            Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
            _ => panic!(),
        }
    })
    .unwrap();
}

#[test]
fn default_when_disconnected() {
    let (_, r) = unbounded::<i32>();

    let mut sel = Select::new();
    sel.recv(&r);
    match sel.try_ready() {
        Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
        _ => panic!(),
    }

    let (_, r) = unbounded::<i32>();

    let mut sel = Select::new();
    sel.recv(&r);
    match sel.ready_timeout(ms(1000)) {
        Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
        _ => panic!(),
    }

    let (s, _) = bounded::<i32>(0);

    let mut sel = Select::new();
    sel.send(&s);
    match sel.try_ready() {
        Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
        _ => panic!(),
    }

    let (s, _) = bounded::<i32>(0);

    let mut sel = Select::new();
    sel.send(&s);
    match sel.ready_timeout(ms(1000)) {
        Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
        _ => panic!(),
    }
}

#[test]
fn default_only() {
    let start = Instant::now();

    let mut sel = Select::new();
    assert!(sel.try_ready().is_err());
    let now = Instant::now();
    assert!(now - start <= ms(50));

    let start = Instant::now();
    let mut sel = Select::new();
    assert!(sel.ready_timeout(ms(500)).is_err());
    let now = Instant::now();
    assert!(now - start >= ms(450));
    assert!(now - start <= ms(550));
}

#[test]
fn unblocks() {
    let (s1, r1) = bounded::<i32>(0);
    let (s2, r2) = bounded::<i32>(0);

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            s2.send(2).unwrap();
        });

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        match sel.ready_timeout(ms(1000)) {
            Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
            _ => panic!(),
        }
    })
    .unwrap();

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            assert_eq!(r1.recv().unwrap(), 1);
        });

        let mut sel = Select::new();
        let oper1 = sel.send(&s1);
        let oper2 = sel.send(&s2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => oper.send(&s1, 1).unwrap(),
                i if i == oper2 => panic!(),
                _ => unreachable!(),
            },
        }
    })
    .unwrap();
}

#[test]
fn both_ready() {
    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            s1.send(1).unwrap();
            assert_eq!(r2.recv().unwrap(), 2);
        });

        for _ in 0..2 {
            let mut sel = Select::new();
            sel.recv(&r1);
            sel.send(&s2);
            match sel.ready() {
                0 => assert_eq!(r1.try_recv(), Ok(1)),
                1 => s2.try_send(2).unwrap(),
                _ => panic!(),
            }
        }
    })
    .unwrap();
}

#[test]
fn cloning1() {
    scope(|scope| {
        let (s1, r1) = unbounded::<i32>();
        let (_s2, r2) = unbounded::<i32>();
        let (s3, r3) = unbounded::<()>();

        scope.spawn(move |_| {
            r3.recv().unwrap();
            drop(s1.clone());
            assert!(r3.try_recv().is_err());
            s1.send(1).unwrap();
            r3.recv().unwrap();
        });

        s3.send(()).unwrap();

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        match sel.ready() {
            0 => drop(r1.try_recv()),
            1 => drop(r2.try_recv()),
            _ => panic!(),
        }

        s3.send(()).unwrap();
    })
    .unwrap();
}

#[test]
fn cloning2() {
    let (s1, r1) = unbounded::<()>();
    let (s2, r2) = unbounded::<()>();
    let (_s3, _r3) = unbounded::<()>();

    scope(|scope| {
        scope.spawn(move |_| {
            let mut sel = Select::new();
            sel.recv(&r1);
            sel.recv(&r2);
            match sel.ready() {
                0 => panic!(),
                1 => drop(r2.try_recv()),
                _ => panic!(),
            }
        });

        thread::sleep(ms(500));
        drop(s1.clone());
        s2.send(()).unwrap();
    })
    .unwrap();
}

#[test]
fn preflight1() {
    let (s, r) = unbounded();
    s.send(()).unwrap();

    let mut sel = Select::new();
    sel.recv(&r);
    match sel.ready() {
        0 => drop(r.try_recv()),
        _ => panic!(),
    }
}

#[test]
fn preflight2() {
    let (s, r) = unbounded();
    drop(s.clone());
    s.send(()).unwrap();
    drop(s);

    let mut sel = Select::new();
    sel.recv(&r);
    match sel.ready() {
        0 => assert_eq!(r.try_recv(), Ok(())),
        _ => panic!(),
    }

    assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
}

#[test]
fn preflight3() {
    let (s, r) = unbounded();
    drop(s.clone());
    s.send(()).unwrap();
    drop(s);
    r.recv().unwrap();

    let mut sel = Select::new();
    sel.recv(&r);
    match sel.ready() {
        0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
        _ => panic!(),
    }
}

#[test]
fn duplicate_operations() {
    let (s, r) = unbounded::<i32>();
    let hit = vec![Cell::new(false); 4];

    while hit.iter().map(|h| h.get()).any(|hit| !hit) {
        let mut sel = Select::new();
        sel.recv(&r);
        sel.recv(&r);
        sel.send(&s);
        sel.send(&s);
        match sel.ready() {
            0 => {
                assert!(r.try_recv().is_ok());
                hit[0].set(true);
            }
            1 => {
                assert!(r.try_recv().is_ok());
                hit[1].set(true);
            }
            2 => {
                assert!(s.try_send(0).is_ok());
                hit[2].set(true);
            }
            3 => {
                assert!(s.try_send(0).is_ok());
                hit[3].set(true);
            }
            _ => panic!(),
        }
    }
}

#[test]
fn nesting() {
    let (s, r) = unbounded::<i32>();

    let mut sel = Select::new();
    sel.send(&s);
    match sel.ready() {
        0 => {
            assert!(s.try_send(0).is_ok());

            let mut sel = Select::new();
            sel.recv(&r);
            match sel.ready() {
                0 => {
                    assert_eq!(r.try_recv(), Ok(0));

                    let mut sel = Select::new();
                    sel.send(&s);
                    match sel.ready() {
                        0 => {
                            assert!(s.try_send(1).is_ok());

                            let mut sel = Select::new();
                            sel.recv(&r);
                            match sel.ready() {
                                0 => {
                                    assert_eq!(r.try_recv(), Ok(1));
                                }
                                _ => panic!(),
                            }
                        }
                        _ => panic!(),
                    }
                }
                _ => panic!(),
            }
        }
        _ => panic!(),
    }
}

#[test]
fn stress_recv() {
    const COUNT: usize = 10_000;

    let (s1, r1) = unbounded();
    let (s2, r2) = bounded(5);
    let (s3, r3) = bounded(0);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                s1.send(i).unwrap();
                r3.recv().unwrap();

                s2.send(i).unwrap();
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                sel.recv(&r1);
                sel.recv(&r2);
                match sel.ready() {
                    0 => assert_eq!(r1.try_recv(), Ok(i)),
                    1 => assert_eq!(r2.try_recv(), Ok(i)),
                    _ => panic!(),
                }

                s3.send(()).unwrap();
            }
        }
    })
    .unwrap();
}

#[test]
fn stress_send() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                assert_eq!(r1.recv().unwrap(), i);
                assert_eq!(r2.recv().unwrap(), i);
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                sel.send(&s1);
                sel.send(&s2);
                match sel.ready() {
                    0 => assert!(s1.try_send(i).is_ok()),
                    1 => assert!(s2.try_send(i).is_ok()),
                    _ => panic!(),
                }
            }
            s3.send(()).unwrap();
        }
    })
    .unwrap();
}

#[test]
fn stress_mixed() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                s1.send(i).unwrap();
                assert_eq!(r2.recv().unwrap(), i);
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                sel.recv(&r1);
                sel.send(&s2);
                match sel.ready() {
                    0 => assert_eq!(r1.try_recv(), Ok(i)),
                    1 => assert!(s2.try_send(i).is_ok()),
                    _ => panic!(),
                }
            }
            s3.send(()).unwrap();
        }
    })
    .unwrap();
}

#[test]
fn stress_timeout_two_threads() {
    const COUNT: usize = 20;

    let (s, r) = bounded(2);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let done = false;
                while !done {
                    let mut sel = Select::new();
                    sel.send(&s);
                    match sel.ready_timeout(ms(100)) {
                        Err(_) => {}
                        Ok(0) => {
                            assert!(s.try_send(i).is_ok());
                            break;
                        }
                        Ok(_) => panic!(),
                    }
                }
            }
        });

        scope.spawn(|_| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let mut done = false;
                while !done {
                    let mut sel = Select::new();
                    sel.recv(&r);
                    match sel.ready_timeout(ms(100)) {
                        Err(_) => {}
                        Ok(0) => {
                            assert_eq!(r.try_recv(), Ok(i));
                            done = true;
                        }
                        Ok(_) => panic!(),
                    }
                }
            }
        });
    })
    .unwrap();
}

#[test]
fn send_recv_same_channel() {
    let (s, r) = bounded::<i32>(0);
    let mut sel = Select::new();
    sel.send(&s);
    sel.recv(&r);
    assert!(sel.ready_timeout(ms(100)).is_err());

    let (s, r) = unbounded::<i32>();
    let mut sel = Select::new();
    sel.send(&s);
    sel.recv(&r);
    match sel.ready_timeout(ms(100)) {
        Err(_) => panic!(),
        Ok(0) => assert!(s.try_send(0).is_ok()),
        Ok(_) => panic!(),
    }
}

#[test]
fn channel_through_channel() {
    const COUNT: usize = 1000;

    type T = Box<Any + Send>;

    for cap in 1..4 {
        let (s, r) = bounded::<T>(cap);

        scope(|scope| {
            scope.spawn(move |_| {
                let mut s = s;

                for _ in 0..COUNT {
                    let (new_s, new_r) = bounded(cap);
                    let new_r: T = Box::new(Some(new_r));

                    {
                        let mut sel = Select::new();
                        sel.send(&s);
                        match sel.ready() {
                            0 => assert!(s.try_send(new_r).is_ok()),
                            _ => panic!(),
                        }
                    }

                    s = new_s;
                }
            });

            scope.spawn(move |_| {
                let mut r = r;

                for _ in 0..COUNT {
                    let new = {
                        let mut sel = Select::new();
                        sel.recv(&r);
                        match sel.ready() {
                            0 => r
                                .try_recv()
                                .unwrap()
                                .downcast_mut::<Option<Receiver<T>>>()
                                .unwrap()
                                .take()
                                .unwrap(),
                            _ => panic!(),
                        }
                    };
                    r = new;
                }
            });
        })
        .unwrap();
    }
}

#[test]
fn fairness1() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded::<()>(COUNT);
    let (s2, r2) = unbounded::<()>();

    for _ in 0..COUNT {
        s1.send(()).unwrap();
        s2.send(()).unwrap();
    }

    let hits = vec![Cell::new(0usize); 4];
    for _ in 0..COUNT {
        let after = after(ms(0));
        let tick = tick(ms(0));

        let mut sel = Select::new();
        sel.recv(&r1);
        sel.recv(&r2);
        sel.recv(&after);
        sel.recv(&tick);
        match sel.ready() {
            0 => {
                r1.try_recv().unwrap();
                hits[0].set(hits[0].get() + 1);
            }
            1 => {
                r2.try_recv().unwrap();
                hits[1].set(hits[1].get() + 1);
            }
            2 => {
                after.try_recv().unwrap();
                hits[2].set(hits[2].get() + 1);
            }
            3 => {
                tick.try_recv().unwrap();
                hits[3].set(hits[3].get() + 1);
            }
            _ => panic!(),
        }
    }
    assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
}

#[test]
fn fairness2() {
    const COUNT: usize = 10_000;

    let (s1, r1) = unbounded::<()>();
    let (s2, r2) = bounded::<()>(1);
    let (s3, r3) = bounded::<()>(0);

    scope(|scope| {
        scope.spawn(|_| {
            for _ in 0..COUNT {
                let mut sel = Select::new();
                let mut oper1 = None;
                let mut oper2 = None;
                if s1.is_empty() {
                    oper1 = Some(sel.send(&s1));
                }
                if s2.is_empty() {
                    oper2 = Some(sel.send(&s2));
                }
                let oper3 = sel.send(&s3);
                let oper = sel.select();
                match oper.index() {
                    i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
                    i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
                    i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
                    _ => unreachable!(),
                }
            }
        });

        let hits = vec![Cell::new(0usize); 3];
        for _ in 0..COUNT {
            let mut sel = Select::new();
            sel.recv(&r1);
            sel.recv(&r2);
            sel.recv(&r3);
            loop {
                match sel.ready() {
                    0 => {
                        if r1.try_recv().is_ok() {
                            hits[0].set(hits[0].get() + 1);
                            break;
                        }
                    }
                    1 => {
                        if r2.try_recv().is_ok() {
                            hits[1].set(hits[1].get() + 1);
                            break;
                        }
                    }
                    2 => {
                        if r3.try_recv().is_ok() {
                            hits[2].set(hits[2].get() + 1);
                            break;
                        }
                    }
                    _ => unreachable!(),
                }
            }
        }
        assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 10));
    })
    .unwrap();
}