may 0.3.36

Rust Stackful Coroutine Library
Documentation
//! compatible with std::sync::mpsc except for both thread and coroutine
//! please ref the doc from std::sync::mpsc
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError};
use std::sync::Arc;
use std::time::{Duration, Instant};

use super::{AtomicOption, Blocker};
use crate::likely::{likely, unlikely};

use may_queue::mpsc::Queue;

// TODO: SyncSender
/// /////////////////////////////////////////////////////////////////////////////
/// InnerQueue
/// /////////////////////////////////////////////////////////////////////////////
struct InnerQueue<T> {
    queue: Queue<T>,
    // thread/coroutine for wake up
    to_wake: AtomicOption<Arc<Blocker>>,
    // The number of tx channels which are currently using this queue.
    channels: AtomicUsize,
    // if rx is dropped
    port_dropped: AtomicBool,
}

impl<T> InnerQueue<T> {
    pub fn new() -> InnerQueue<T> {
        InnerQueue {
            queue: Queue::new(),
            to_wake: AtomicOption::none(),
            channels: AtomicUsize::new(1),
            port_dropped: AtomicBool::new(false),
        }
    }

    pub fn send(&self, t: T) -> Result<(), T> {
        if unlikely(self.port_dropped.load(Ordering::Acquire)) {
            return Err(t);
        }
        self.queue.push(t);
        if let Some(w) = self.to_wake.take() {
            w.unpark();
        }
        Ok(())
    }

    pub fn recv(&self, dur: Option<Duration>) -> Result<T, TryRecvError> {
        // match self.try_recv() {
        //     Err(TryRecvError::Empty) => {}
        //     data => return data,
        // }

        let cur = Blocker::current();
        // register the waiter
        self.to_wake.store(cur.clone());
        // re-check the queue
        match self.try_recv() {
            Err(TryRecvError::Empty) => {
                cur.park(dur).ok();
            }
            data => {
                // no need to park, contention with send
                self.to_wake.clear();
                return data;
            }
        }

        // after come back try recv again
        self.try_recv()
    }

    #[inline]
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        match self.queue.pop() {
            Some(data) => Ok(data),
            None => {
                if likely(self.channels.load(Ordering::Acquire) > 0) {
                    Err(TryRecvError::Empty)
                } else {
                    // there is no sender any more, should re-check
                    self.queue.pop().ok_or(TryRecvError::Disconnected)
                }
            }
        }
    }

    pub fn clone_chan(&self) {
        self.channels.fetch_add(1, Ordering::AcqRel);
    }

    pub fn drop_chan(&self) {
        match self.channels.fetch_sub(1, Ordering::AcqRel) {
            1 => self.to_wake.take().map(|w| w.unpark()).unwrap_or(()),
            n if n > 1 => {}
            n => panic!("bad number of channels left {n}"),
        }
    }

    pub fn drop_port(&self) {
        self.port_dropped.store(true, Ordering::Release);
        // clear all the data
        while self.queue.pop().is_some() {}
    }
}

impl<T> Drop for InnerQueue<T> {
    fn drop(&mut self) {
        assert_eq!(self.channels.load(Ordering::Acquire), 0);
        assert!(self.to_wake.take().is_none());
    }
}

pub struct Receiver<T> {
    inner: Arc<InnerQueue<T>>,
}

unsafe impl<T: Send> Send for Receiver<T> {}
// impl<T> !Sync for Receiver<T> {}

pub struct Iter<'a, T: 'a> {
    rx: &'a Receiver<T>,
}

pub struct TryIter<'a, T: 'a> {
    rx: &'a Receiver<T>,
}

pub struct IntoIter<T> {
    rx: Receiver<T>,
}

pub struct Sender<T> {
    inner: Arc<InnerQueue<T>>,
}

unsafe impl<T: Send> Send for Sender<T> {}
// impl<T> !Sync for Sender<T> {}
impl<T: Send> UnwindSafe for Sender<T> {}
impl<T: Send> RefUnwindSafe for Sender<T> {}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(InnerQueue::new());
    (Sender::new(a.clone()), Receiver::new(a))
}

/// /////////////////////////////////////////////////////////////////////////////
/// Sender
/// /////////////////////////////////////////////////////////////////////////////

impl<T> Sender<T> {
    fn new(inner: Arc<InnerQueue<T>>) -> Sender<T> {
        Sender { inner }
    }

    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
        self.inner.send(t).map_err(SendError)
    }
}

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Sender<T> {
        self.inner.clone_chan();
        Sender::new(self.inner.clone())
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.inner.drop_chan();
    }
}

impl<T> fmt::Debug for Sender<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Sender {{ .. }}")
    }
}

/// /////////////////////////////////////////////////////////////////////////////
/// Receiver
/// /////////////////////////////////////////////////////////////////////////////

impl<T> Receiver<T> {
    fn new(inner: Arc<InnerQueue<T>>) -> Receiver<T> {
        Receiver { inner }
    }

    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        self.inner.try_recv()
    }

    pub fn recv(&self) -> Result<T, RecvError> {
        loop {
            match self.inner.recv(None) {
                Err(TryRecvError::Empty) => {}
                data => return data.map_err(|_| RecvError),
            }
        }
    }

    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
        // Do an optimistic try_recv to avoid the performance impact of
        // Instant::now() in the full-channel case.
        match self.try_recv() {
            Ok(result) => Ok(result),
            Err(TryRecvError::Empty) => self.recv_max_until(timeout),
            Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
        }
    }

    fn recv_max_until(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
        let deadline = Instant::now() + timeout;
        loop {
            match self.inner.recv(Some(timeout)) {
                Ok(t) => return Ok(t),
                Err(TryRecvError::Empty) => {}
                Err(TryRecvError::Disconnected) => return Err(RecvTimeoutError::Disconnected),
            }

            // If we're already passed the deadline, and we're here without
            // data, return a timeout, else try again.
            if Instant::now() >= deadline {
                return Err(RecvTimeoutError::Timeout);
            }
        }
    }

    pub fn iter(&self) -> Iter<T> {
        Iter { rx: self }
    }

    pub fn try_iter(&self) -> TryIter<T> {
        TryIter { rx: self }
    }
}

impl<'a, T> Iterator for Iter<'a, T> {
    type Item = T;

    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}

impl<'a, T> Iterator for TryIter<'a, T> {
    type Item = T;

    fn next(&mut self) -> Option<T> {
        self.rx.try_recv().ok()
    }
}

impl<'a, T> IntoIterator for &'a Receiver<T> {
    type Item = T;
    type IntoIter = Iter<'a, T>;

    fn into_iter(self) -> Iter<'a, T> {
        self.iter()
    }
}

impl<T> Iterator for IntoIter<T> {
    type Item = T;
    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}

impl<T> IntoIterator for Receiver<T> {
    type Item = T;
    type IntoIter = IntoIter<T>;

    fn into_iter(self) -> IntoIter<T> {
        IntoIter { rx: self }
    }
}

impl<T> Drop for Receiver<T> {
    fn drop(&mut self) {
        self.inner.drop_port();
    }
}

impl<T> fmt::Debug for Receiver<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Receiver {{ .. }}")
    }
}

#[cfg(test)]
#[allow(clippy::redundant_clone)]
mod tests {
    use super::*;
    use std::env;
    use std::sync::mpsc::{RecvTimeoutError, TryRecvError};
    use std::thread;
    use std::time::{Duration, Instant};

    pub fn stress_factor() -> usize {
        match env::var("RUST_TEST_STRESS") {
            Ok(val) => val.parse().unwrap(),
            Err(..) => 1,
        }
    }

    #[test]
    fn smoke() {
        let (tx, rx) = channel::<i32>();
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }

    #[test]
    fn drop_full() {
        let (tx, _rx) = channel::<Box<isize>>();
        tx.send(Box::new(1)).unwrap();
    }

    #[test]
    fn drop_full_shared() {
        let (tx, _rx) = channel::<Box<isize>>();
        drop(tx.clone());
        drop(tx.clone());
        tx.send(Box::new(1)).unwrap();
    }

    #[test]
    fn smoke_shared() {
        let (tx, rx) = channel::<i32>();
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
        let tx = tx.clone();
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }

    #[test]
    fn smoke_threads() {
        let (tx, rx) = channel::<i32>();
        let _t = thread::spawn(move || {
            tx.send(1).unwrap();
        });
        assert_eq!(rx.recv().unwrap(), 1);
    }

    #[test]
    fn smoke_coroutine() {
        let (tx, rx) = channel::<i32>();
        let _t = go!(move || {
            tx.send(1).unwrap();
        });
        assert_eq!(rx.recv().unwrap(), 1);
    }

    #[test]
    fn smoke_port_gone() {
        let (tx, rx) = channel::<i32>();
        drop(rx);
        assert!(tx.send(1).is_err());
    }

    #[test]
    fn smoke_shared_port_gone2() {
        let (tx, rx) = channel::<i32>();
        drop(rx);
        let tx2 = tx.clone();
        drop(tx);
        assert!(tx2.send(1).is_err());
    }

    #[test]
    fn port_gone_concurrent() {
        let (tx, rx) = channel::<i32>();
        let _t = thread::spawn(move || {
            rx.recv().unwrap();
        });
        while tx.send(1).is_ok() {}
    }

    #[test]
    fn port_gone_concurrent1() {
        let (tx, rx) = channel::<i32>();
        let _t = go!(move || {
            rx.recv().unwrap();
        });
        while tx.send(1).is_ok() {}
    }

    #[test]
    fn port_gone_concurrent_shared() {
        let (tx, rx) = channel::<i32>();
        let tx2 = tx.clone();
        let _t = thread::spawn(move || {
            rx.recv().unwrap();
        });
        while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
    }

    #[test]
    fn smoke_chan_gone() {
        let (tx, rx) = channel::<i32>();
        drop(tx);
        assert!(rx.recv().is_err());
    }

    #[test]
    fn smoke_chan_gone_shared() {
        let (tx, rx) = channel::<()>();
        let tx2 = tx.clone();
        drop(tx);
        drop(tx2);
        assert!(rx.recv().is_err());
    }

    #[test]
    fn chan_gone_concurrent() {
        let (tx, rx) = channel::<i32>();
        let _t = go!(move || {
            tx.send(1).unwrap();
            tx.send(1).unwrap();
        });
        while rx.recv().is_ok() {}
    }

    #[test]
    fn stress() {
        let (tx, rx) = channel::<i32>();
        let t = thread::spawn(move || {
            for _ in 0..10000 {
                tx.send(1).unwrap();
            }
        });
        for _ in 0..10000 {
            assert_eq!(rx.recv().unwrap(), 1);
        }
        t.join().ok().unwrap();
    }

    #[test]
    fn stress_shared() {
        const AMT: u32 = 10000;
        const NTHREADS: u32 = 8;
        let (tx, rx) = channel::<i32>();

        let t = thread::spawn(move || {
            for _ in 0..AMT * NTHREADS {
                assert_eq!(rx.recv().unwrap(), 1);
            }
            if rx.try_recv().is_ok() {
                panic!();
            }
        });

        for _ in 0..NTHREADS {
            let tx = tx.clone();
            go!(move || for _ in 0..AMT {
                tx.send(1).unwrap();
            });
        }
        drop(tx);
        t.join().ok().unwrap();
    }

    #[test]
    fn send_from_outside_runtime() {
        let (tx1, rx1) = channel::<()>();
        let (tx2, rx2) = channel::<i32>();
        let t1 = go!(move || {
            tx1.send(()).unwrap();
            for _ in 0..40 {
                assert_eq!(rx2.recv().unwrap(), 1);
            }
        });
        rx1.recv().unwrap();
        let t2 = go!(move || for _ in 0..40 {
            tx2.send(1).unwrap();
        });
        t1.join().ok().unwrap();
        t2.join().ok().unwrap();
    }

    #[test]
    fn recv_from_outside_runtime() {
        let (tx, rx) = channel::<i32>();
        let t = thread::spawn(move || {
            for _ in 0..40 {
                assert_eq!(rx.recv().unwrap(), 1);
            }
        });
        for _ in 0..40 {
            tx.send(1).unwrap();
        }
        t.join().ok().unwrap();
    }

    #[test]
    fn no_runtime() {
        let (tx1, rx1) = channel::<i32>();
        let (tx2, rx2) = channel::<i32>();
        let t1 = thread::spawn(move || {
            assert_eq!(rx1.recv().unwrap(), 1);
            tx2.send(2).unwrap();
        });
        let t2 = go!(move || {
            tx1.send(1).unwrap();
            assert_eq!(rx2.recv().unwrap(), 2);
        });
        t1.join().ok().unwrap();
        t2.join().ok().unwrap();
    }

    #[test]
    fn oneshot_single_thread_close_port_first() {
        // Simple test of closing without sending
        let (_tx, rx) = channel::<i32>();
        drop(rx);
    }

    #[test]
    fn oneshot_single_thread_close_chan_first() {
        // Simple test of closing without sending
        let (tx, _rx) = channel::<i32>();
        drop(tx);
    }

    #[test]
    fn oneshot_single_thread_send_port_close() {
        // Testing that the sender cleans up the payload if receiver is closed
        let (tx, rx) = channel::<Box<i32>>();
        drop(rx);
        assert!(tx.send(Box::new(0)).is_err());
    }

    #[test]
    fn oneshot_single_thread_recv_chan_close() {
        // Receiving on a closed chan will panic
        let res = go!(move || {
            let (tx, rx) = channel::<i32>();
            drop(tx);
            rx.recv().unwrap();
        })
        .join();
        // What is our res?
        assert!(res.is_err());
    }

    #[test]
    fn oneshot_single_thread_send_then_recv() {
        let (tx, rx) = channel::<Box<i32>>();
        tx.send(Box::new(10)).unwrap();
        assert!(*rx.recv().unwrap() == 10);
    }

    #[test]
    fn oneshot_single_thread_try_send_open() {
        let (tx, rx) = channel::<i32>();
        assert!(tx.send(10).is_ok());
        assert!(rx.recv().unwrap() == 10);
    }

    #[test]
    fn oneshot_single_thread_try_send_closed() {
        let (tx, rx) = channel::<i32>();
        drop(rx);
        assert!(tx.send(10).is_err());
    }

    #[test]
    fn oneshot_single_thread_try_recv_open() {
        let (tx, rx) = channel::<i32>();
        tx.send(10).unwrap();
        assert!(rx.recv() == Ok(10));
    }

    #[test]
    fn oneshot_single_thread_try_recv_closed() {
        let (tx, rx) = channel::<i32>();
        drop(tx);
        assert!(rx.recv().is_err());
    }

    #[test]
    fn oneshot_single_thread_peek_data() {
        let (tx, rx) = channel::<i32>();
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
        tx.send(10).unwrap();
        assert_eq!(rx.try_recv(), Ok(10));
    }

    #[test]
    fn oneshot_single_thread_peek_close() {
        let (tx, rx) = channel::<i32>();
        drop(tx);
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
    }

    #[test]
    fn oneshot_single_thread_peek_open() {
        let (_tx, rx) = channel::<i32>();
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
    }

    #[test]
    fn oneshot_multi_task_recv_then_send() {
        let (tx, rx) = channel::<Box<i32>>();
        let _t = thread::spawn(move || {
            assert!(*rx.recv().unwrap() == 10);
        });

        tx.send(Box::new(10)).unwrap();
    }

    #[test]
    fn oneshot_multi_task_recv_then_close() {
        let (tx, rx) = channel::<Box<i32>>();
        let _t = thread::spawn(move || {
            drop(tx);
        });
        let res = thread::spawn(move || {
            assert!(*rx.recv().unwrap() == 10);
        })
        .join();
        assert!(res.is_err());
    }

    #[test]
    fn oneshot_multi_thread_close_stress() {
        for _ in 0..stress_factor() {
            let (tx, rx) = channel::<i32>();
            let _t = thread::spawn(move || {
                drop(rx);
            });
            drop(tx);
        }
    }

    #[test]
    fn oneshot_multi_thread_send_close_stress() {
        for _ in 0..stress_factor() {
            let (tx, rx) = channel::<i32>();
            let _t = thread::spawn(move || {
                drop(rx);
            });
            let _ = thread::spawn(move || {
                tx.send(1).unwrap();
            })
            .join();
        }
    }

    #[test]
    fn oneshot_multi_thread_recv_close_stress() {
        for _ in 0..stress_factor() {
            let (tx, rx) = channel::<i32>();
            thread::spawn(move || {
                let res = thread::spawn(move || {
                    rx.recv().unwrap();
                })
                .join();
                assert!(res.is_err());
            });
            let _t = thread::spawn(move || {
                thread::spawn(move || {
                    drop(tx);
                });
            });
        }
    }

    #[test]
    fn oneshot_multi_thread_send_recv_stress() {
        for _ in 0..stress_factor() {
            let (tx, rx) = channel::<Box<isize>>();
            let _t = thread::spawn(move || {
                tx.send(Box::new(10)).unwrap();
            });
            assert!(*rx.recv().unwrap() == 10);
        }
    }

    #[test]
    fn stream_send_recv_stress() {
        for _ in 0..stress_factor() {
            let (tx, rx) = channel();

            send(tx, 0);
            recv(rx, 0);

            fn send(tx: Sender<Box<i32>>, i: i32) {
                if i == 10 {
                    return;
                }

                thread::spawn(move || {
                    tx.send(Box::new(i)).unwrap();
                    send(tx, i + 1);
                });
            }

            fn recv(rx: Receiver<Box<i32>>, i: i32) {
                if i == 10 {
                    return;
                }

                thread::spawn(move || {
                    assert!(*rx.recv().unwrap() == i);
                    recv(rx, i + 1);
                });
            }
        }
    }

    #[test]
    fn oneshot_single_thread_recv_timeout() {
        let (tx, rx) = channel();
        tx.send(()).unwrap();
        assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
        assert_eq!(
            rx.recv_timeout(Duration::from_millis(1)),
            Err(RecvTimeoutError::Timeout)
        );
        tx.send(()).unwrap();
        assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
    }

    #[test]
    fn stress_recv_timeout_two_threads() {
        let (tx, rx) = channel();
        let stress = stress_factor() + 100;
        let timeout = Duration::from_millis(1);

        thread::spawn(move || {
            for i in 0..stress {
                if i % 2 == 0 {
                    thread::sleep(timeout * 2);
                }
                tx.send(1usize).unwrap();
            }
        });

        let mut recv_count = 0;
        loop {
            match rx.recv_timeout(timeout) {
                Ok(n) => {
                    assert_eq!(n, 1usize);
                    recv_count += 1;
                }
                Err(RecvTimeoutError::Timeout) => continue,
                Err(RecvTimeoutError::Disconnected) => break,
            }
        }

        assert_eq!(recv_count, stress);
    }

    #[test]
    fn recv_timeout_upgrade() {
        let (tx, rx) = channel::<()>();
        let timeout = Duration::from_millis(1);
        let _tx_clone = tx.clone();

        let start = Instant::now();
        assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
        assert!(Instant::now() >= start + timeout);
    }

    #[test]
    fn stress_recv_timeout_shared() {
        let (tx, rx) = channel();
        let stress = stress_factor() + 100;

        for i in 0..stress {
            let tx = tx.clone();
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(i as u64 * 10));
                tx.send(1usize).unwrap();
            });
        }

        drop(tx);

        let mut recv_count = 0;
        loop {
            match rx.recv_timeout(Duration::from_millis(30)) {
                Ok(n) => {
                    assert_eq!(n, 1usize);
                    recv_count += 1;
                }
                Err(RecvTimeoutError::Timeout) => continue,
                Err(RecvTimeoutError::Disconnected) => break,
            }
        }

        assert_eq!(recv_count, stress);
    }

    #[test]
    fn recv_a_lot() {
        // Regression test that we don't run out of stack in scheduler context
        let (tx, rx) = channel();
        for _ in 0..10000 {
            tx.send(()).unwrap();
        }
        for _ in 0..10000 {
            rx.recv().unwrap();
        }
    }

    #[test]
    fn shared_recv_timeout() {
        let (tx, rx) = channel();
        let total = 5;
        for _ in 0..total {
            let tx = tx.clone();
            thread::spawn(move || {
                tx.send(()).unwrap();
            });
        }

        for _ in 0..total {
            rx.recv().unwrap();
        }

        assert_eq!(
            rx.recv_timeout(Duration::from_millis(1)),
            Err(RecvTimeoutError::Timeout)
        );
        tx.send(()).unwrap();
        assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
    }

    #[test]
    fn shared_chan_stress() {
        let (tx, rx) = channel();
        let total = stress_factor() + 100;
        for _ in 0..total {
            let tx = tx.clone();
            thread::spawn(move || {
                tx.send(()).unwrap();
            });
        }

        for _ in 0..total {
            rx.recv().unwrap();
        }
    }

    #[test]
    fn test_nested_recv_iter() {
        let (tx, rx) = channel::<i32>();
        let (total_tx, total_rx) = channel::<i32>();

        let _t = thread::spawn(move || {
            let mut acc = 0;
            for x in rx.iter() {
                acc += x;
            }
            total_tx.send(acc).unwrap();
        });

        tx.send(3).unwrap();
        tx.send(1).unwrap();
        tx.send(2).unwrap();
        drop(tx);
        assert_eq!(total_rx.recv().unwrap(), 6);
    }

    #[test]
    fn test_recv_iter_break() {
        let (tx, rx) = channel::<i32>();
        let (count_tx, count_rx) = channel();

        let _t = thread::spawn(move || {
            let mut count = 0;
            for x in rx.iter() {
                if count >= 3 {
                    break;
                } else {
                    count += x;
                }
            }
            count_tx.send(count).unwrap();
        });

        tx.send(2).unwrap();
        tx.send(2).unwrap();
        tx.send(2).unwrap();
        let _ = tx.send(2);
        drop(tx);
        assert_eq!(count_rx.recv().unwrap(), 4);
    }

    #[test]
    fn test_recv_try_iter() {
        let (request_tx, request_rx) = channel();
        let (response_tx, response_rx) = channel();

        // Request `x`s until we have `6`.
        let t = thread::spawn(move || {
            let mut count = 0;
            loop {
                for x in response_rx.try_iter() {
                    count += x;
                    if count == 6 {
                        return count;
                    }
                }
                request_tx.send(()).unwrap();
            }
        });

        for _ in request_rx.iter() {
            if response_tx.send(2).is_err() {
                break;
            }
        }

        assert_eq!(t.join().unwrap(), 6);
    }

    #[test]
    fn test_recv_into_iter_owned() {
        let mut iter = {
            let (tx, rx) = channel::<i32>();
            tx.send(1).unwrap();
            tx.send(2).unwrap();

            rx.into_iter()
        };
        assert_eq!(iter.next().unwrap(), 1);
        assert_eq!(iter.next().unwrap(), 2);
        assert!(iter.next().is_none());
    }

    #[test]
    fn test_recv_into_iter_borrowed() {
        let (tx, rx) = channel::<i32>();
        tx.send(1).unwrap();
        tx.send(2).unwrap();
        drop(tx);
        let mut iter = (&rx).into_iter();
        assert_eq!(iter.next().unwrap(), 1);
        assert_eq!(iter.next().unwrap(), 2);
        assert!(iter.next().is_none());
    }

    #[test]
    fn try_recv_states() {
        let (tx1, rx1) = channel::<i32>();
        let (tx2, rx2) = channel::<()>();
        let (tx3, rx3) = channel::<()>();
        let _t = thread::spawn(move || {
            rx2.recv().unwrap();
            tx1.send(1).unwrap();
            tx3.send(()).unwrap();
            rx2.recv().unwrap();
            drop(tx1);
            tx3.send(()).unwrap();
        });

        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
        assert_eq!(rx1.try_recv(), Ok(1));
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
    }

    // This bug used to end up in a livelock inside of the Receiver destructor
    // because the internal state of the Shared packet was corrupted
    #[test]
    fn destroy_upgraded_shared_port_when_sender_still_active() {
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
        let _t = thread::spawn(move || {
            rx.recv().unwrap(); // wait on a oneshot
            drop(rx); // destroy a shared
            tx2.send(()).unwrap();
        });
        // make sure the other thread has gone to sleep
        for _ in 0..5000 {
            thread::yield_now();
        }

        // upgrade to a shared chan and send a message
        let t = tx.clone();
        drop(tx);
        t.send(()).unwrap();

        // wait for the child thread to exit before we exit
        rx2.recv().unwrap();
    } /*
      }

      #[cfg(test)]
      mod sync_tests {
          use prelude::v1::*;

          use env;
          use thread;
          use super::*;
          use time::Duration;

          pub fn stress_factor() -> usize {
              match env::var("RUST_TEST_STRESS") {
                  Ok(val) => val.parse().unwrap(),
                  Err(..) => 1,
              }
          }

          #[test]
          fn smoke() {
              let (tx, rx) = sync_channel::<i32>(1);
              tx.send(1).unwrap();
              assert_eq!(rx.recv().unwrap(), 1);
          }

          #[test]
          fn drop_full() {
              let (tx, _rx) = sync_channel::<Box<isize>>(1);
              tx.send(box 1).unwrap();
          }

          #[test]
          fn smoke_shared() {
              let (tx, rx) = sync_channel::<i32>(1);
              tx.send(1).unwrap();
              assert_eq!(rx.recv().unwrap(), 1);
              let tx = tx.clone();
              tx.send(1).unwrap();
              assert_eq!(rx.recv().unwrap(), 1);
          }

          #[test]
          fn recv_timeout() {
              let (tx, rx) = sync_channel::<i32>(1);
              assert_eq!(rx.recv_timeout(Duration::from_millis(1)),
                         Err(RecvTimeoutError::Timeout));
              tx.send(1).unwrap();
              assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
          }

          #[test]
          fn smoke_threads() {
              let (tx, rx) = sync_channel::<i32>(0);
              let _t = thread::spawn(move || {
                  tx.send(1).unwrap();
              });
              assert_eq!(rx.recv().unwrap(), 1);
          }

          #[test]
          fn smoke_port_gone() {
              let (tx, rx) = sync_channel::<i32>(0);
              drop(rx);
              assert!(tx.send(1).is_err());
          }

          #[test]
          fn smoke_shared_port_gone2() {
              let (tx, rx) = sync_channel::<i32>(0);
              drop(rx);
              let tx2 = tx.clone();
              drop(tx);
              assert!(tx2.send(1).is_err());
          }

          #[test]
          fn port_gone_concurrent() {
              let (tx, rx) = sync_channel::<i32>(0);
              let _t = thread::spawn(move || {
                  rx.recv().unwrap();
              });
              while tx.send(1).is_ok() {}
          }

          #[test]
          fn port_gone_concurrent_shared() {
              let (tx, rx) = sync_channel::<i32>(0);
              let tx2 = tx.clone();
              let _t = thread::spawn(move || {
                  rx.recv().unwrap();
              });
              while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
          }

          #[test]
          fn smoke_chan_gone() {
              let (tx, rx) = sync_channel::<i32>(0);
              drop(tx);
              assert!(rx.recv().is_err());
          }

          #[test]
          fn smoke_chan_gone_shared() {
              let (tx, rx) = sync_channel::<()>(0);
              let tx2 = tx.clone();
              drop(tx);
              drop(tx2);
              assert!(rx.recv().is_err());
          }

          #[test]
          fn chan_gone_concurrent() {
              let (tx, rx) = sync_channel::<i32>(0);
              thread::spawn(move || {
                  tx.send(1).unwrap();
                  tx.send(1).unwrap();
              });
              while rx.recv().is_ok() {}
          }

          #[test]
          fn stress() {
              let (tx, rx) = sync_channel::<i32>(0);
              thread::spawn(move || {
                  for _ in 0..10000 {
                      tx.send(1).unwrap();
                  }
              });
              for _ in 0..10000 {
                  assert_eq!(rx.recv().unwrap(), 1);
              }
          }

          #[test]
          fn stress_recv_timeout_two_threads() {
              let (tx, rx) = sync_channel::<i32>(0);

              thread::spawn(move || {
                  for _ in 0..10000 {
                      tx.send(1).unwrap();
                  }
              });

              let mut recv_count = 0;
              loop {
                  match rx.recv_timeout(Duration::from_millis(1)) {
                      Ok(v) => {
                          assert_eq!(v, 1);
                          recv_count += 1;
                      }
                      Err(RecvTimeoutError::Timeout) => continue,
                      Err(RecvTimeoutError::Disconnected) => break,
                  }
              }

              assert_eq!(recv_count, 10000);
          }

          #[test]
          fn stress_recv_timeout_shared() {
              const AMT: u32 = 1000;
              const NTHREADS: u32 = 8;
              let (tx, rx) = sync_channel::<i32>(0);
              let (dtx, drx) = sync_channel::<()>(0);

              thread::spawn(move || {
                  let mut recv_count = 0;
                  loop {
                      match rx.recv_timeout(Duration::from_millis(10)) {
                          Ok(v) => {
                              assert_eq!(v, 1);
                              recv_count += 1;
                          }
                          Err(RecvTimeoutError::Timeout) => continue,
                          Err(RecvTimeoutError::Disconnected) => break,
                      }
                  }

                  assert_eq!(recv_count, AMT * NTHREADS);
                  assert!(rx.try_recv().is_err());

                  dtx.send(()).unwrap();
              });

              for _ in 0..NTHREADS {
                  let tx = tx.clone();
                  thread::spawn(move || {
                      for _ in 0..AMT {
                          tx.send(1).unwrap();
                      }
                  });
              }

              drop(tx);

              drx.recv().unwrap();
          }

          #[test]
          fn stress_shared() {
              const AMT: u32 = 1000;
              const NTHREADS: u32 = 8;
              let (tx, rx) = sync_channel::<i32>(0);
              let (dtx, drx) = sync_channel::<()>(0);

              thread::spawn(move || {
                  for _ in 0..AMT * NTHREADS {
                      assert_eq!(rx.recv().unwrap(), 1);
                  }
                  match rx.try_recv() {
                      Ok(..) => panic!(),
                      _ => {}
                  }
                  dtx.send(()).unwrap();
              });

              for _ in 0..NTHREADS {
                  let tx = tx.clone();
                  thread::spawn(move || {
                      for _ in 0..AMT {
                          tx.send(1).unwrap();
                      }
                  });
              }
              drop(tx);
              drx.recv().unwrap();
          }

          #[test]
          fn oneshot_single_thread_close_port_first() {
              // Simple test of closing without sending
              let (_tx, rx) = sync_channel::<i32>(0);
              drop(rx);
          }

          #[test]
          fn oneshot_single_thread_close_chan_first() {
              // Simple test of closing without sending
              let (tx, _rx) = sync_channel::<i32>(0);
              drop(tx);
          }

          #[test]
          fn oneshot_single_thread_send_port_close() {
              // Testing that the sender cleans up the payload if receiver is closed
              let (tx, rx) = sync_channel::<Box<i32>>(0);
              drop(rx);
              assert!(tx.send(box 0).is_err());
          }

          #[test]
          fn oneshot_single_thread_recv_chan_close() {
              // Receiving on a closed chan will panic
              let res = thread::spawn(move || {
                      let (tx, rx) = sync_channel::<i32>(0);
                      drop(tx);
                      rx.recv().unwrap();
                  })
                  .join();
              // What is our res?
              assert!(res.is_err());
          }

          #[test]
          fn oneshot_single_thread_send_then_recv() {
              let (tx, rx) = sync_channel::<Box<i32>>(1);
              tx.send(box 10).unwrap();
              assert!(rx.recv().unwrap() == box 10);
          }

          #[test]
          fn oneshot_single_thread_try_send_open() {
              let (tx, rx) = sync_channel::<i32>(1);
              assert_eq!(tx.try_send(10), Ok(()));
              assert!(rx.recv().unwrap() == 10);
          }

          // #[test]
          // fn oneshot_single_thread_try_send_closed() {
          //     let (tx, rx) = sync_channel::<i32>(0);
          //     drop(rx);
          //     assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
          // }

          // #[test]
          // fn oneshot_single_thread_try_send_closed2() {
          //     let (tx, _rx) = sync_channel::<i32>(0);
          //     assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
          // }

          #[test]
          fn oneshot_single_thread_try_recv_open() {
              let (tx, rx) = sync_channel::<i32>(1);
              tx.send(10).unwrap();
              assert!(rx.recv() == Ok(10));
          }

          #[test]
          fn oneshot_single_thread_try_recv_closed() {
              let (tx, rx) = sync_channel::<i32>(0);
              drop(tx);
              assert!(rx.recv().is_err());
          }

          #[test]
          fn oneshot_single_thread_try_recv_closed_with_data() {
              let (tx, rx) = sync_channel::<i32>(1);
              tx.send(10).unwrap();
              drop(tx);
              assert_eq!(rx.try_recv(), Ok(10));
              assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
          }

          #[test]
          fn oneshot_single_thread_peek_data() {
              let (tx, rx) = sync_channel::<i32>(1);
              assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
              tx.send(10).unwrap();
              assert_eq!(rx.try_recv(), Ok(10));
          }

          #[test]
          fn oneshot_single_thread_peek_close() {
              let (tx, rx) = sync_channel::<i32>(0);
              drop(tx);
              assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
              assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
          }

          #[test]
          fn oneshot_single_thread_peek_open() {
              let (_tx, rx) = sync_channel::<i32>(0);
              assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
          }

          #[test]
          fn oneshot_multi_task_recv_then_send() {
              let (tx, rx) = sync_channel::<Box<i32>>(0);
              let _t = thread::spawn(move || {
                  assert!(rx.recv().unwrap() == box 10);
              });

              tx.send(box 10).unwrap();
          }

          #[test]
          fn oneshot_multi_task_recv_then_close() {
              let (tx, rx) = sync_channel::<Box<i32>>(0);
              let _t = thread::spawn(move || {
                  drop(tx);
              });
              let res = thread::spawn(move || {
                      assert!(rx.recv().unwrap() == box 10);
                  })
                  .join();
              assert!(res.is_err());
          }

          #[test]
          fn oneshot_multi_thread_close_stress() {
              for _ in 0..stress_factor() {
                  let (tx, rx) = sync_channel::<i32>(0);
                  let _t = thread::spawn(move || {
                      drop(rx);
                  });
                  drop(tx);
              }
          }

          #[test]
          fn oneshot_multi_thread_send_close_stress() {
              for _ in 0..stress_factor() {
                  let (tx, rx) = sync_channel::<i32>(0);
                  let _t = thread::spawn(move || {
                      drop(rx);
                  });
                  let _ = thread::spawn(move || {
                          tx.send(1).unwrap();
                      })
                      .join();
              }
          }

          #[test]
          fn oneshot_multi_thread_recv_close_stress() {
              for _ in 0..stress_factor() {
                  let (tx, rx) = sync_channel::<i32>(0);
                  let _t = thread::spawn(move || {
                      let res = thread::spawn(move || {
                              rx.recv().unwrap();
                          })
                          .join();
                      assert!(res.is_err());
                  });
                  let _t = thread::spawn(move || {
                      thread::spawn(move || {
                          drop(tx);
                      });
                  });
              }
          }

          #[test]
          fn oneshot_multi_thread_send_recv_stress() {
              for _ in 0..stress_factor() {
                  let (tx, rx) = sync_channel::<Box<i32>>(0);
                  let _t = thread::spawn(move || {
                      tx.send(box 10).unwrap();
                  });
                  assert!(rx.recv().unwrap() == box 10);
              }
          }

          // #[test]
          // fn stream_send_recv_stress() {
          //     for _ in 0..stress_factor() {
          //         let (tx, rx) = sync_channel::<Box<i32>>(0);
          //
          //         send(tx, 0);
          //         recv(rx, 0);
          //
          //         fn send(tx: SyncSender<Box<i32>>, i: i32) {
          //             if i == 10 {
          //                 return;
          //             }
          //
          //             thread::spawn(move || {
          //                 tx.send(box i).unwrap();
          //                 send(tx, i + 1);
          //             });
          //         }
          //
          //         fn recv(rx: Receiver<Box<i32>>, i: i32) {
          //             if i == 10 {
          //                 return;
          //             }
          //
          //             thread::spawn(move || {
          //                 assert!(rx.recv().unwrap() == box i);
          //                 recv(rx, i + 1);
          //             });
          //         }
          //     }
          // }

          #[test]
          fn recv_a_lot() {
              // Regression test that we don't run out of stack in scheduler context
              let (tx, rx) = sync_channel(10000);
              for _ in 0..10000 {
                  tx.send(()).unwrap();
              }
              for _ in 0..10000 {
                  rx.recv().unwrap();
              }
          }

          #[test]
          fn shared_chan_stress() {
              let (tx, rx) = sync_channel(0);
              let total = stress_factor() + 100;
              for _ in 0..total {
                  let tx = tx.clone();
                  thread::spawn(move || {
                      tx.send(()).unwrap();
                  });
              }

              for _ in 0..total {
                  rx.recv().unwrap();
              }
          }

          #[test]
          fn test_nested_recv_iter() {
              let (tx, rx) = sync_channel::<i32>(0);
              let (total_tx, total_rx) = sync_channel::<i32>(0);

              let _t = thread::spawn(move || {
                  let mut acc = 0;
                  for x in rx.iter() {
                      acc += x;
                  }
                  total_tx.send(acc).unwrap();
              });

              tx.send(3).unwrap();
              tx.send(1).unwrap();
              tx.send(2).unwrap();
              drop(tx);
              assert_eq!(total_rx.recv().unwrap(), 6);
          }

          #[test]
          fn test_recv_iter_break() {
              let (tx, rx) = sync_channel::<i32>(0);
              let (count_tx, count_rx) = sync_channel(0);

              let _t = thread::spawn(move || {
                  let mut count = 0;
                  for x in rx.iter() {
                      if count >= 3 {
                          break;
                      } else {
                          count += x;
                      }
                  }
                  count_tx.send(count).unwrap();
              });

              tx.send(2).unwrap();
              tx.send(2).unwrap();
              tx.send(2).unwrap();
              let _ = tx.try_send(2);
              drop(tx);
              assert_eq!(count_rx.recv().unwrap(), 4);
          }

          #[test]
          fn try_recv_states() {
              let (tx1, rx1) = sync_channel::<i32>(1);
              let (tx2, rx2) = sync_channel::<()>(1);
              let (tx3, rx3) = sync_channel::<()>(1);
              let _t = thread::spawn(move || {
                  rx2.recv().unwrap();
                  tx1.send(1).unwrap();
                  tx3.send(()).unwrap();
                  rx2.recv().unwrap();
                  drop(tx1);
                  tx3.send(()).unwrap();
              });

              assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
              tx2.send(()).unwrap();
              rx3.recv().unwrap();
              assert_eq!(rx1.try_recv(), Ok(1));
              assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
              tx2.send(()).unwrap();
              rx3.recv().unwrap();
              assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
          }

          // This bug used to end up in a livelock inside of the Receiver destructor
          // because the internal state of the Shared packet was corrupted
          #[test]
          fn destroy_upgraded_shared_port_when_sender_still_active() {
              let (tx, rx) = sync_channel::<()>(0);
              let (tx2, rx2) = sync_channel::<()>(0);
              let _t = thread::spawn(move || {
                  rx.recv().unwrap(); // wait on a oneshot
                  drop(rx);  // destroy a shared
                  tx2.send(()).unwrap();
              });
              // make sure the other thread has gone to sleep
              for _ in 0..5000 {
                  thread::yield_now();
              }

              // upgrade to a shared chan and send a message
              let t = tx.clone();
              drop(tx);
              t.send(()).unwrap();

              // wait for the child thread to exit before we exit
              rx2.recv().unwrap();
          }

          #[test]
          fn send1() {
              let (tx, rx) = sync_channel::<i32>(0);
              let _t = thread::spawn(move || {
                  rx.recv().unwrap();
              });
              assert_eq!(tx.send(1), Ok(()));
          }

          #[test]
          fn send2() {
              let (tx, rx) = sync_channel::<i32>(0);
              let _t = thread::spawn(move || {
                  drop(rx);
              });
              assert!(tx.send(1).is_err());
          }

          #[test]
          fn send3() {
              let (tx, rx) = sync_channel::<i32>(1);
              assert_eq!(tx.send(1), Ok(()));
              let _t = thread::spawn(move || {
                  drop(rx);
              });
              assert!(tx.send(1).is_err());
          }

          #[test]
          fn send4() {
              let (tx, rx) = sync_channel::<i32>(0);
              let tx2 = tx.clone();
              let (done, done_rx) = channel();
              let done2 = done.clone();
              let _t = thread::spawn(move || {
                  assert!(tx.send(1).is_err());
                  done.send(()).unwrap();
              });
              let _t = thread::spawn(move || {
                  assert!(tx2.send(2).is_err());
                  done2.send(()).unwrap();
              });
              drop(rx);
              done_rx.recv().unwrap();
              done_rx.recv().unwrap();
          }

          // #[test]
          // fn try_send1() {
          //     let (tx, _rx) = sync_channel::<i32>(0);
          //     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
          // }

          // #[test]
          // fn try_send2() {
          //     let (tx, _rx) = sync_channel::<i32>(1);
          //     assert_eq!(tx.try_send(1), Ok(()));
          //     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
          // }

          // #[test]
          // fn try_send3() {
          //     let (tx, rx) = sync_channel::<i32>(1);
          //     assert_eq!(tx.try_send(1), Ok(()));
          //     drop(rx);
          //     assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
          // }

          #[test]
          fn issue_15761() {
              fn repro() {
                  let (tx1, rx1) = sync_channel::<()>(3);
                  let (tx2, rx2) = sync_channel::<()>(3);

                  let _t = thread::spawn(move || {
                      rx1.recv().unwrap();
                      tx2.try_send(()).unwrap();
                  });

                  tx1.try_send(()).unwrap();
                  rx2.recv().unwrap();
              }

              for _ in 0..100 {
                  repro()
              }
          }

          #[test]
          fn fmt_debug_sender() {
              let (tx, _) = channel::<i32>();
              assert_eq!(format!("{:?}", tx), "Sender { .. }");
          }

          #[test]
          fn fmt_debug_recv() {
              let (_, rx) = channel::<i32>();
              assert_eq!(format!("{:?}", rx), "Receiver { .. }");
          }

          // #[test]
          // fn fmt_debug_sync_sender() {
          //     let (tx, _) = sync_channel::<i32>(1);
          //     assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
          // }*/
}