crossbeam-channel 0.3.7

Multi-producer multi-consumer channels for message passing
Documentation
//! Tests for channel selection using the `Select` struct.

extern crate crossbeam_channel;
extern crate crossbeam_utils;

use std::any::Any;
use std::cell::Cell;
use std::thread;
use std::time::{Duration, Instant};

use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
use crossbeam_utils::thread::scope;

fn ms(ms: u64) -> Duration {
    Duration::from_millis(ms)
}

#[test]
fn smoke1() {
    let (s1, r1) = unbounded::<usize>();
    let (s2, r2) = unbounded::<usize>();

    s1.send(1).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r1);
    let oper2 = sel.recv(&r2);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
        i if i == oper2 => panic!(),
        _ => unreachable!(),
    }

    s2.send(2).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r1);
    let oper2 = sel.recv(&r2);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => panic!(),
        i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
        _ => unreachable!(),
    }
}

#[test]
fn smoke2() {
    let (_s1, r1) = unbounded::<i32>();
    let (_s2, r2) = unbounded::<i32>();
    let (_s3, r3) = unbounded::<i32>();
    let (_s4, r4) = unbounded::<i32>();
    let (s5, r5) = unbounded::<i32>();

    s5.send(5).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r1);
    let oper2 = sel.recv(&r2);
    let oper3 = sel.recv(&r3);
    let oper4 = sel.recv(&r4);
    let oper5 = sel.recv(&r5);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => panic!(),
        i if i == oper2 => panic!(),
        i if i == oper3 => panic!(),
        i if i == oper4 => panic!(),
        i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
        _ => unreachable!(),
    }
}

#[test]
fn disconnected() {
    let (s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    scope(|scope| {
        scope.spawn(|_| {
            drop(s1);
            thread::sleep(ms(500));
            s2.send(5).unwrap();
        });

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => assert!(oper.recv(&r1).is_err()),
                i if i == oper2 => panic!(),
                _ => unreachable!(),
            },
        }

        r2.recv().unwrap();
    }).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r1);
    let oper2 = sel.recv(&r2);
    let oper = sel.select_timeout(ms(1000));
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.recv(&r1).is_err()),
            i if i == oper2 => panic!(),
            _ => unreachable!(),
        },
    }

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            drop(s2);
        });

        let mut sel = Select::new();
        let oper1 = sel.recv(&r2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => assert!(oper.recv(&r2).is_err()),
                _ => unreachable!(),
            },
        }
    }).unwrap();
}

#[test]
fn default() {
    let (s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    let mut sel = Select::new();
    let _oper1 = sel.recv(&r1);
    let _oper2 = sel.recv(&r2);
    let oper = sel.try_select();
    match oper {
        Err(_) => {}
        Ok(_) => panic!(),
    }

    drop(s1);

    let mut sel = Select::new();
    let oper1 = sel.recv(&r1);
    let oper2 = sel.recv(&r2);
    let oper = sel.try_select();
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.recv(&r1).is_err()),
            i if i == oper2 => panic!(),
            _ => unreachable!(),
        },
    }

    s2.send(2).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r2);
    let oper = sel.try_select();
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
            _ => unreachable!(),
        },
    }

    let mut sel = Select::new();
    let _oper1 = sel.recv(&r2);
    let oper = sel.try_select();
    match oper {
        Err(_) => {}
        Ok(_) => panic!(),
    }

    let mut sel = Select::new();
    let oper = sel.try_select();
    match oper {
        Err(_) => {}
        Ok(_) => panic!(),
    }
}

#[test]
fn timeout() {
    let (_s1, r1) = unbounded::<i32>();
    let (s2, r2) = unbounded::<i32>();

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(1500));
            s2.send(2).unwrap();
        });

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => {}
            Ok(oper) => match oper.index() {
                i if i == oper1 => panic!(),
                i if i == oper2 => panic!(),
                _ => unreachable!(),
            },
        }

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => panic!(),
                i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
                _ => unreachable!(),
            },
        }
    }).unwrap();

    scope(|scope| {
        let (s, r) = unbounded::<i32>();

        scope.spawn(move |_| {
            thread::sleep(ms(500));
            drop(s);
        });

        let mut sel = Select::new();
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => {
                let mut sel = Select::new();
                let oper1 = sel.recv(&r);
                let oper = sel.try_select();
                match oper {
                    Err(_) => panic!(),
                    Ok(oper) => match oper.index() {
                        i if i == oper1 => assert!(oper.recv(&r).is_err()),
                        _ => unreachable!(),
                    },
                }
            }
            Ok(_) => unreachable!(),
        }
    }).unwrap();
}

#[test]
fn default_when_disconnected() {
    let (_, r) = unbounded::<i32>();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper = sel.try_select();
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.recv(&r).is_err()),
            _ => unreachable!(),
        },
    }

    let (_, r) = unbounded::<i32>();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper = sel.select_timeout(ms(1000));
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.recv(&r).is_err()),
            _ => unreachable!(),
        },
    }

    let (s, _) = bounded::<i32>(0);

    let mut sel = Select::new();
    let oper1 = sel.send(&s);
    let oper = sel.try_select();
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
            _ => unreachable!(),
        },
    }

    let (s, _) = bounded::<i32>(0);

    let mut sel = Select::new();
    let oper1 = sel.send(&s);
    let oper = sel.select_timeout(ms(1000));
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
            _ => unreachable!(),
        },
    }
}

#[test]
fn default_only() {
    let start = Instant::now();

    let mut sel = Select::new();
    let oper = sel.try_select();
    assert!(oper.is_err());
    let now = Instant::now();
    assert!(now - start <= ms(50));

    let start = Instant::now();
    let mut sel = Select::new();
    let oper = sel.select_timeout(ms(500));
    assert!(oper.is_err());
    let now = Instant::now();
    assert!(now - start >= ms(450));
    assert!(now - start <= ms(550));
}

#[test]
fn unblocks() {
    let (s1, r1) = bounded::<i32>(0);
    let (s2, r2) = bounded::<i32>(0);

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            s2.send(2).unwrap();
        });

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => panic!(),
                i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
                _ => unreachable!(),
            },
        }
    }).unwrap();

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            assert_eq!(r1.recv().unwrap(), 1);
        });

        let mut sel = Select::new();
        let oper1 = sel.send(&s1);
        let oper2 = sel.send(&s2);
        let oper = sel.select_timeout(ms(1000));
        match oper {
            Err(_) => panic!(),
            Ok(oper) => match oper.index() {
                i if i == oper1 => oper.send(&s1, 1).unwrap(),
                i if i == oper2 => panic!(),
                _ => unreachable!(),
            },
        }
    }).unwrap();
}

#[test]
fn both_ready() {
    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);

    scope(|scope| {
        scope.spawn(|_| {
            thread::sleep(ms(500));
            s1.send(1).unwrap();
            assert_eq!(r2.recv().unwrap(), 2);
        });

        for _ in 0..2 {
            let mut sel = Select::new();
            let oper1 = sel.recv(&r1);
            let oper2 = sel.send(&s2);
            let oper = sel.select();
            match oper.index() {
                i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
                i if i == oper2 => oper.send(&s2, 2).unwrap(),
                _ => unreachable!(),
            }
        }
    }).unwrap();
}

#[test]
fn loop_try() {
    const RUNS: usize = 20;

    for _ in 0..RUNS {
        let (s1, r1) = bounded::<i32>(0);
        let (s2, r2) = bounded::<i32>(0);
        let (s_end, r_end) = bounded::<()>(0);

        scope(|scope| {
            scope.spawn(|_| {
                loop {
                    let mut done = false;

                    let mut sel = Select::new();
                    let oper1 = sel.send(&s1);
                    let oper = sel.try_select();
                    match oper {
                        Err(_) => {}
                        Ok(oper) => match oper.index() {
                            i if i == oper1 => {
                                let _ = oper.send(&s1, 1);
                                done = true;
                            }
                            _ => unreachable!(),
                        },
                    }
                    if done {
                        break;
                    }

                    let mut sel = Select::new();
                    let oper1 = sel.recv(&r_end);
                    let oper = sel.try_select();
                    match oper {
                        Err(_) => {}
                        Ok(oper) => match oper.index() {
                            i if i == oper1 => {
                                let _ = oper.recv(&r_end);
                                done = true;
                            }
                            _ => unreachable!(),
                        },
                    }
                    if done {
                        break;
                    }
                }
            });

            scope.spawn(|_| {
                loop {
                    if let Ok(x) = r2.try_recv() {
                        assert_eq!(x, 2);
                        break;
                    }

                    let mut done = false;
                    let mut sel = Select::new();
                    let oper1 = sel.recv(&r_end);
                    let oper = sel.try_select();
                    match oper {
                        Err(_) => {}
                        Ok(oper) => match oper.index() {
                            i if i == oper1 => {
                                let _ = oper.recv(&r_end);
                                done = true;
                            }
                            _ => unreachable!(),
                        },
                    }
                    if done {
                        break;
                    }
                }
            });

            scope.spawn(|_| {
                thread::sleep(ms(500));

                let mut sel = Select::new();
                let oper1 = sel.recv(&r1);
                let oper2 = sel.send(&s2);
                let oper = sel.select_timeout(ms(1000));
                match oper {
                    Err(_) => {}
                    Ok(oper) => match oper.index() {
                        i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
                        i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
                        _ => unreachable!(),
                    },
                }

                drop(s_end);
            });
        }).unwrap();
    }
}

#[test]
fn cloning1() {
    scope(|scope| {
        let (s1, r1) = unbounded::<i32>();
        let (_s2, r2) = unbounded::<i32>();
        let (s3, r3) = unbounded::<()>();

        scope.spawn(move |_| {
            r3.recv().unwrap();
            drop(s1.clone());
            assert!(r3.try_recv().is_err());
            s1.send(1).unwrap();
            r3.recv().unwrap();
        });

        s3.send(()).unwrap();

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper = sel.select();
        match oper.index() {
            i if i == oper1 => drop(oper.recv(&r1)),
            i if i == oper2 => drop(oper.recv(&r2)),
            _ => unreachable!(),
        }

        s3.send(()).unwrap();
    }).unwrap();
}

#[test]
fn cloning2() {
    let (s1, r1) = unbounded::<()>();
    let (s2, r2) = unbounded::<()>();
    let (_s3, _r3) = unbounded::<()>();

    scope(|scope| {
        scope.spawn(move |_| {
            let mut sel = Select::new();
            let oper1 = sel.recv(&r1);
            let oper2 = sel.recv(&r2);
            let oper = sel.select();
            match oper.index() {
                i if i == oper1 => panic!(),
                i if i == oper2 => drop(oper.recv(&r2)),
                _ => unreachable!(),
            }
        });

        thread::sleep(ms(500));
        drop(s1.clone());
        s2.send(()).unwrap();
    }).unwrap();
}

#[test]
fn preflight1() {
    let (s, r) = unbounded();
    s.send(()).unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => drop(oper.recv(&r)),
        _ => unreachable!(),
    }
}

#[test]
fn preflight2() {
    let (s, r) = unbounded();
    drop(s.clone());
    s.send(()).unwrap();
    drop(s);

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
        _ => unreachable!(),
    }

    assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
}

#[test]
fn preflight3() {
    let (s, r) = unbounded();
    drop(s.clone());
    s.send(()).unwrap();
    drop(s);
    r.recv().unwrap();

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => assert!(oper.recv(&r).is_err()),
        _ => unreachable!(),
    }
}

#[test]
fn duplicate_operations() {
    let (s, r) = unbounded::<i32>();
    let hit = vec![Cell::new(false); 4];

    while hit.iter().map(|h| h.get()).any(|hit| !hit) {
        let mut sel = Select::new();
        let oper0 = sel.recv(&r);
        let oper1 = sel.recv(&r);
        let oper2 = sel.send(&s);
        let oper3 = sel.send(&s);
        let oper = sel.select();
        match oper.index() {
            i if i == oper0 => {
                assert!(oper.recv(&r).is_ok());
                hit[0].set(true);
            }
            i if i == oper1 => {
                assert!(oper.recv(&r).is_ok());
                hit[1].set(true);
            }
            i if i == oper2 => {
                assert!(oper.send(&s, 0).is_ok());
                hit[2].set(true);
            }
            i if i == oper3 => {
                assert!(oper.send(&s, 0).is_ok());
                hit[3].set(true);
            }
            _ => unreachable!(),
        }
    }
}

#[test]
fn nesting() {
    let (s, r) = unbounded::<i32>();

    let mut sel = Select::new();
    let oper1 = sel.send(&s);
    let oper = sel.select();
    match oper.index() {
        i if i == oper1 => {
            assert!(oper.send(&s, 0).is_ok());

            let mut sel = Select::new();
            let oper1 = sel.recv(&r);
            let oper = sel.select();
            match oper.index() {
                i if i == oper1 => {
                    assert_eq!(oper.recv(&r), Ok(0));

                    let mut sel = Select::new();
                    let oper1 = sel.send(&s);
                    let oper = sel.select();
                    match oper.index() {
                        i if i == oper1 => {
                            assert!(oper.send(&s, 1).is_ok());

                            let mut sel = Select::new();
                            let oper1 = sel.recv(&r);
                            let oper = sel.select();
                            match oper.index() {
                                i if i == oper1 => {
                                    assert_eq!(oper.recv(&r), Ok(1));
                                }
                                _ => unreachable!(),
                            }
                        }
                        _ => unreachable!(),
                    }
                }
                _ => unreachable!(),
            }
        }
        _ => unreachable!(),
    }
}

#[test]
fn stress_recv() {
    const COUNT: usize = 10_000;

    let (s1, r1) = unbounded();
    let (s2, r2) = bounded(5);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                s1.send(i).unwrap();
                r3.recv().unwrap();

                s2.send(i).unwrap();
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                let oper1 = sel.recv(&r1);
                let oper2 = sel.recv(&r2);
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
                    ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
                    _ => unreachable!(),
                }

                s3.send(()).unwrap();
            }
        }
    }).unwrap();
}

#[test]
fn stress_send() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                assert_eq!(r1.recv().unwrap(), i);
                assert_eq!(r2.recv().unwrap(), i);
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                let oper1 = sel.send(&s1);
                let oper2 = sel.send(&s2);
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
                    ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
                    _ => unreachable!(),
                }
            }
            s3.send(()).unwrap();
        }
    }).unwrap();
}

#[test]
fn stress_mixed() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                s1.send(i).unwrap();
                assert_eq!(r2.recv().unwrap(), i);
                r3.recv().unwrap();
            }
        });

        for i in 0..COUNT {
            for _ in 0..2 {
                let mut sel = Select::new();
                let oper1 = sel.recv(&r1);
                let oper2 = sel.send(&s2);
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
                    _ => unreachable!(),
                }
            }
            s3.send(()).unwrap();
        }
    }).unwrap();
}

#[test]
fn stress_timeout_two_threads() {
    const COUNT: usize = 20;

    let (s, r) = bounded(2);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let mut done = false;
                while !done {
                    let mut sel = Select::new();
                    let oper1 = sel.send(&s);
                    let oper = sel.select_timeout(ms(100));
                    match oper {
                        Err(_) => {}
                        Ok(oper) => match oper.index() {
                            ix if ix == oper1 => {
                                assert!(oper.send(&s, i).is_ok());
                                break;
                            }
                            _ => unreachable!(),
                        },
                    }
                }
            }
        });

        scope.spawn(|_| {
            for i in 0..COUNT {
                if i % 2 == 0 {
                    thread::sleep(ms(500));
                }

                let mut done = false;
                while !done {
                    let mut sel = Select::new();
                    let oper1 = sel.recv(&r);
                    let oper = sel.select_timeout(ms(100));
                    match oper {
                        Err(_) => {}
                        Ok(oper) => match oper.index() {
                            ix if ix == oper1 => {
                                assert_eq!(oper.recv(&r), Ok(i));
                                done = true;
                            }
                            _ => unreachable!(),
                        },
                    }
                }
            }
        });
    }).unwrap();
}

#[test]
fn send_recv_same_channel() {
    let (s, r) = bounded::<i32>(0);
    let mut sel = Select::new();
    let oper1 = sel.send(&s);
    let oper2 = sel.recv(&r);
    let oper = sel.select_timeout(ms(100));
    match oper {
        Err(_) => {}
        Ok(oper) => match oper.index() {
            ix if ix == oper1 => panic!(),
            ix if ix == oper2 => panic!(),
            _ => unreachable!(),
        },
    }

    let (s, r) = unbounded::<i32>();
    let mut sel = Select::new();
    let oper1 = sel.send(&s);
    let oper2 = sel.recv(&r);
    let oper = sel.select_timeout(ms(100));
    match oper {
        Err(_) => panic!(),
        Ok(oper) => match oper.index() {
            ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
            ix if ix == oper2 => panic!(),
            _ => unreachable!(),
        },
    }
}

#[test]
fn matching() {
    const THREADS: usize = 44;

    let (s, r) = &bounded::<usize>(0);

    scope(|scope| {
        for i in 0..THREADS {
            scope.spawn(move |_| {
                let mut sel = Select::new();
                let oper1 = sel.recv(&r);
                let oper2 = sel.send(&s);
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
                    _ => unreachable!(),
                }
            });
        }
    }).unwrap();

    assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn matching_with_leftover() {
    const THREADS: usize = 55;

    let (s, r) = &bounded::<usize>(0);

    scope(|scope| {
        for i in 0..THREADS {
            scope.spawn(move |_| {
                let mut sel = Select::new();
                let oper1 = sel.recv(&r);
                let oper2 = sel.send(&s);
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
                    _ => unreachable!(),
                }
            });
        }
        s.send(!0).unwrap();
    }).unwrap();

    assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn channel_through_channel() {
    const COUNT: usize = 1000;

    type T = Box<Any + Send>;

    for cap in 0..3 {
        let (s, r) = bounded::<T>(cap);

        scope(|scope| {
            scope.spawn(move |_| {
                let mut s = s;

                for _ in 0..COUNT {
                    let (new_s, new_r) = bounded(cap);
                    let mut new_r: T = Box::new(Some(new_r));

                    {
                        let mut sel = Select::new();
                        let oper1 = sel.send(&s);
                        let oper = sel.select();
                        match oper.index() {
                            ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
                            _ => unreachable!(),
                        }
                    }

                    s = new_s;
                }
            });

            scope.spawn(move |_| {
                let mut r = r;

                for _ in 0..COUNT {
                    let new = {
                        let mut sel = Select::new();
                        let oper1 = sel.recv(&r);
                        let oper = sel.select();
                        match oper.index() {
                            ix if ix == oper1 => oper
                                .recv(&r)
                                .unwrap()
                                .downcast_mut::<Option<Receiver<T>>>()
                                .unwrap()
                                .take()
                                .unwrap(),
                            _ => unreachable!(),
                        }
                    };
                    r = new;
                }
            });
        }).unwrap();
    }
}

#[test]
fn linearizable_try() {
    const COUNT: usize = 100_000;

    for step in 0..2 {
        let (start_s, start_r) = bounded::<()>(0);
        let (end_s, end_r) = bounded::<()>(0);

        let ((s1, r1), (s2, r2)) = if step == 0 {
            (bounded::<i32>(1), bounded::<i32>(1))
        } else {
            (unbounded::<i32>(), unbounded::<i32>())
        };

        scope(|scope| {
            scope.spawn(|_| {
                for _ in 0..COUNT {
                    start_s.send(()).unwrap();

                    s1.send(1).unwrap();

                    let mut sel = Select::new();
                    let oper1 = sel.recv(&r1);
                    let oper2 = sel.recv(&r2);
                    let oper = sel.try_select();
                    match oper {
                        Err(_) => unreachable!(),
                        Ok(oper) => match oper.index() {
                            ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
                            ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
                            _ => unreachable!(),
                        },
                    }

                    end_s.send(()).unwrap();
                    let _ = r2.try_recv();
                }
            });

            for _ in 0..COUNT {
                start_r.recv().unwrap();

                s2.send(1).unwrap();
                let _ = r1.try_recv();

                end_r.recv().unwrap();
            }
        }).unwrap();
    }
}

#[test]
fn linearizable_timeout() {
    const COUNT: usize = 100_000;

    for step in 0..2 {
        let (start_s, start_r) = bounded::<()>(0);
        let (end_s, end_r) = bounded::<()>(0);

        let ((s1, r1), (s2, r2)) = if step == 0 {
            (bounded::<i32>(1), bounded::<i32>(1))
        } else {
            (unbounded::<i32>(), unbounded::<i32>())
        };

        scope(|scope| {
            scope.spawn(|_| {
                for _ in 0..COUNT {
                    start_s.send(()).unwrap();

                    s1.send(1).unwrap();

                    let mut sel = Select::new();
                    let oper1 = sel.recv(&r1);
                    let oper2 = sel.recv(&r2);
                    let oper = sel.select_timeout(ms(0));
                    match oper {
                        Err(_) => unreachable!(),
                        Ok(oper) => match oper.index() {
                            ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
                            ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
                            _ => unreachable!(),
                        },
                    }

                    end_s.send(()).unwrap();
                    let _ = r2.try_recv();
                }
            });

            for _ in 0..COUNT {
                start_r.recv().unwrap();

                s2.send(1).unwrap();
                let _ = r1.try_recv();

                end_r.recv().unwrap();
            }
        }).unwrap();
    }
}

#[test]
fn fairness1() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded::<()>(COUNT);
    let (s2, r2) = unbounded::<()>();

    for _ in 0..COUNT {
        s1.send(()).unwrap();
        s2.send(()).unwrap();
    }

    let hits = vec![Cell::new(0usize); 4];
    for _ in 0..COUNT {
        let after = after(ms(0));
        let tick = tick(ms(0));

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.recv(&r2);
        let oper3 = sel.recv(&after);
        let oper4 = sel.recv(&tick);
        let oper = sel.select();
        match oper.index() {
            i if i == oper1 => {
                oper.recv(&r1).unwrap();
                hits[0].set(hits[0].get() + 1);
            }
            i if i == oper2 => {
                oper.recv(&r2).unwrap();
                hits[1].set(hits[1].get() + 1);
            }
            i if i == oper3 => {
                oper.recv(&after).unwrap();
                hits[2].set(hits[2].get() + 1);
            }
            i if i == oper4 => {
                oper.recv(&tick).unwrap();
                hits[3].set(hits[3].get() + 1);
            }
            _ => unreachable!(),
        }
    }
    assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
}

#[test]
fn fairness2() {
    const COUNT: usize = 10_000;

    let (s1, r1) = unbounded::<()>();
    let (s2, r2) = bounded::<()>(1);
    let (s3, r3) = bounded::<()>(0);

    scope(|scope| {
        scope.spawn(|_| {
            for _ in 0..COUNT {
                let mut sel = Select::new();
                let mut oper1 = None;
                let mut oper2 = None;
                if s1.is_empty() {
                    oper1 = Some(sel.send(&s1));
                }
                if s2.is_empty() {
                    oper2 = Some(sel.send(&s2));
                }
                let oper3 = sel.send(&s3);
                let oper = sel.select();
                match oper.index() {
                    i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
                    i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
                    i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
                    _ => unreachable!(),
                }
            }
        });

        let hits = vec![Cell::new(0usize); 3];
        for _ in 0..COUNT {
            let mut sel = Select::new();
            let oper1 = sel.recv(&r1);
            let oper2 = sel.recv(&r2);
            let oper3 = sel.recv(&r3);
            let oper = sel.select();
            match oper.index() {
                i if i == oper1 => {
                    oper.recv(&r1).unwrap();
                    hits[0].set(hits[0].get() + 1);
                }
                i if i == oper2 => {
                    oper.recv(&r2).unwrap();
                    hits[1].set(hits[1].get() + 1);
                }
                i if i == oper3 => {
                    oper.recv(&r3).unwrap();
                    hits[2].set(hits[2].get() + 1);
                }
                _ => unreachable!(),
            }
        }
        assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
    }).unwrap();
}

#[test]
fn sync_and_clone() {
    const THREADS: usize = 20;

    let (s, r) = &bounded::<usize>(0);

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper2 = sel.send(&s);
    let sel = &sel;

    scope(|scope| {
        for i in 0..THREADS {
            scope.spawn(move |_| {
                let mut sel = sel.clone();
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
                    _ => unreachable!(),
                }
            });
        }
    }).unwrap();

    assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn send_and_clone() {
    const THREADS: usize = 20;

    let (s, r) = &bounded::<usize>(0);

    let mut sel = Select::new();
    let oper1 = sel.recv(&r);
    let oper2 = sel.send(&s);

    scope(|scope| {
        for i in 0..THREADS {
            let mut sel = sel.clone();
            scope.spawn(move |_| {
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
                    _ => unreachable!(),
                }
            });
        }
    }).unwrap();

    assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}

#[test]
fn reuse() {
    const COUNT: usize = 10_000;

    let (s1, r1) = bounded(0);
    let (s2, r2) = bounded(0);
    let (s3, r3) = bounded(100);

    scope(|scope| {
        scope.spawn(|_| {
            for i in 0..COUNT {
                s1.send(i).unwrap();
                assert_eq!(r2.recv().unwrap(), i);
                r3.recv().unwrap();
            }
        });

        let mut sel = Select::new();
        let oper1 = sel.recv(&r1);
        let oper2 = sel.send(&s2);

        for i in 0..COUNT {
            for _ in 0..2 {
                let oper = sel.select();
                match oper.index() {
                    ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
                    ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
                    _ => unreachable!(),
                }
            }
            s3.send(()).unwrap();
        }
    }).unwrap();
}