oneshot 0.2.1

Oneshot spsc channel with (potentially) lock-free non-blocking send, and a receiver supporting both thread blocking receive operations as well as Future based async polling.
Documentation
#![cfg(oneshot_loom)]

use oneshot::TryRecvError;

use loom::hint;
use loom::thread;
#[cfg(feature = "async")]
use std::future::{Future, IntoFuture};
#[cfg(feature = "async")]
use std::pin::Pin;
#[cfg(feature = "async")]
use std::task::{self, Poll};
#[cfg(feature = "std")]
use std::time::Duration;

mod helpers;

#[test]
fn try_recv() {
    loom::model(|| {
        let (sender, receiver) = oneshot::channel::<u128>();

        let t = thread::spawn(move || {
            loop {
                match receiver.try_recv() {
                    Ok(msg) => break msg,
                    Err(TryRecvError::Empty) => hint::spin_loop(),
                    Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
                }
            }
        });

        assert!(sender.send(19).is_ok());
        assert_eq!(t.join().unwrap(), 19);
    })
}

#[cfg(feature = "std")]
#[test]
fn send_recv_different_threads() {
    loom::model(|| {
        let (sender, receiver) = oneshot::channel();
        let t2 = thread::spawn(move || {
            assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9));
        });
        let t1 = thread::spawn(move || {
            sender.send(9u128).unwrap();
        });
        t1.join().unwrap();
        t2.join().unwrap();
    })
}

#[cfg(feature = "std")]
#[test]
fn recv_drop_sender_different_threads() {
    loom::model(|| {
        let (sender, receiver) = oneshot::channel::<u128>();
        let t2 = thread::spawn(move || {
            assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err());
        });
        let t1 = thread::spawn(move || {
            drop(sender);
        });
        t1.join().unwrap();
        t2.join().unwrap();
    })
}

#[cfg(feature = "async")]
#[test]
fn async_recv() {
    loom::model(|| {
        let (sender, receiver) = oneshot::channel::<u128>();
        let t1 = thread::spawn(move || {
            sender.send(987).unwrap();
        });
        assert_eq!(loom::future::block_on(receiver.into_future()), Ok(987));
        t1.join().unwrap();
    })
}

#[cfg(feature = "async")]
#[test]
fn send_then_poll() {
    loom::model(|| {
        let (sender, mut receiver) = oneshot::async_channel::<u128>();
        sender.send(1234).unwrap();

        let (waker, waker_handle) = helpers::waker::waker();
        let mut context = task::Context::from_waker(&waker);

        assert_eq!(
            Pin::new(&mut receiver).poll(&mut context),
            Poll::Ready(Ok(1234))
        );
        assert_eq!(waker_handle.clone_count(), 0);
        assert_eq!(waker_handle.drop_count(), 0);
        assert_eq!(waker_handle.wake_count(), 0);
    })
}

// Make sure the receiver can be dropped while a send is happening in parallel
#[cfg(feature = "async")]
#[test]
fn poll_then_drop_receiver_during_send() {
    loom::model(|| {
        let (sender, mut receiver) = oneshot::async_channel::<u128>();

        let (waker, _waker_handle) = helpers::waker::waker();
        let mut context = task::Context::from_waker(&waker);

        // Put the channel into the receiving state
        assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);

        // Spawn a separate thread that sends in parallel
        let t = thread::spawn(move || {
            let _ = sender.send(1234);
        });

        // Drop the receiver. Loom will make sure all thread interleavings with the send are tested
        drop(receiver);

        // The send operation should also not have panicked
        t.join().unwrap();
    })
}

#[cfg(feature = "async")]
#[test]
fn poll_then_send() {
    loom::model(|| {
        let (sender, mut receiver) = oneshot::async_channel::<u128>();

        let (waker, waker_handle) = helpers::waker::waker();
        let mut context = task::Context::from_waker(&waker);

        assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
        assert_eq!(waker_handle.clone_count(), 1);
        assert_eq!(waker_handle.drop_count(), 0);
        assert_eq!(waker_handle.wake_count(), 0);

        sender.send(1234).unwrap();
        assert_eq!(waker_handle.clone_count(), 1);
        assert_eq!(waker_handle.drop_count(), 1);
        assert_eq!(waker_handle.wake_count(), 1);

        assert_eq!(
            Pin::new(&mut receiver).poll(&mut context),
            Poll::Ready(Ok(1234))
        );
        assert_eq!(waker_handle.clone_count(), 1);
        assert_eq!(waker_handle.drop_count(), 1);
        assert_eq!(waker_handle.wake_count(), 1);
    })
}

#[cfg(feature = "async")]
#[test]
fn poll_with_different_wakers() {
    loom::model(|| {
        let (sender, mut receiver) = oneshot::async_channel::<u128>();

        let (waker1, waker_handle1) = helpers::waker::waker();
        let mut context1 = task::Context::from_waker(&waker1);

        assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending);
        assert_eq!(waker_handle1.clone_count(), 1);
        assert_eq!(waker_handle1.drop_count(), 0);
        assert_eq!(waker_handle1.wake_count(), 0);

        let (waker2, waker_handle2) = helpers::waker::waker();
        let mut context2 = task::Context::from_waker(&waker2);

        assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending);
        assert_eq!(waker_handle1.clone_count(), 1);
        assert_eq!(waker_handle1.drop_count(), 1);
        assert_eq!(waker_handle1.wake_count(), 0);

        assert_eq!(waker_handle2.clone_count(), 1);
        assert_eq!(waker_handle2.drop_count(), 0);
        assert_eq!(waker_handle2.wake_count(), 0);

        // Sending should cause the waker from the latest poll to be woken up
        sender.send(1234).unwrap();
        assert_eq!(waker_handle1.clone_count(), 1);
        assert_eq!(waker_handle1.drop_count(), 1);
        assert_eq!(waker_handle1.wake_count(), 0);

        assert_eq!(waker_handle2.clone_count(), 1);
        assert_eq!(waker_handle2.drop_count(), 1);
        assert_eq!(waker_handle2.wake_count(), 1);
    })
}