tokio 0.2.24

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![allow(clippy::redundant_clone)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
use tokio_test::{
    assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};

use std::sync::Arc;

trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}

#[test]
fn send_recv_with_buffer() {
    let (tx, rx) = mpsc::channel::<i32>(16);
    let mut tx = task::spawn(tx);
    let mut rx = task::spawn(rx);

    // Using poll_ready / try_send
    assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
    tx.try_send(1).unwrap();

    // Without poll_ready
    tx.try_send(2).unwrap();

    drop(tx);

    let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
    assert_eq!(val, Some(1));

    let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
    assert_eq!(val, Some(2));

    let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
    assert!(val.is_none());
}

#[test]
fn disarm() {
    let (tx, rx) = mpsc::channel::<i32>(2);
    let mut tx1 = task::spawn(tx.clone());
    let mut tx2 = task::spawn(tx.clone());
    let mut tx3 = task::spawn(tx.clone());
    let mut tx4 = task::spawn(tx);
    let mut rx = task::spawn(rx);

    // We should be able to `poll_ready` two handles without problem
    assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
    assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));

    // But a third should not be ready
    assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));

    // Using one of the reserved slots should allow a new handle to become ready
    tx1.try_send(1).unwrap();
    // We also need to receive for the slot to be free
    let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
    // Now there's a free slot!
    assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
    assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));

    // Dropping a ready handle should also open up a slot
    drop(tx2);
    assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
    assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));

    // Explicitly disarming a handle should also open a slot
    assert!(tx3.disarm());
    assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));

    // Disarming a non-armed sender does not free up a slot
    assert!(!tx3.disarm());
    assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
}

#[tokio::test]
async fn send_recv_stream_with_buffer() {
    use tokio::stream::StreamExt;

    let (mut tx, mut rx) = mpsc::channel::<i32>(16);

    tokio::spawn(async move {
        assert_ok!(tx.send(1).await);
        assert_ok!(tx.send(2).await);
    });

    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

#[tokio::test]
async fn async_send_recv_with_buffer() {
    let (mut tx, mut rx) = mpsc::channel(16);

    tokio::spawn(async move {
        assert_ok!(tx.send(1).await);
        assert_ok!(tx.send(2).await);
    });

    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}

#[test]
fn start_send_past_cap() {
    let mut t1 = task::spawn(());
    let mut t2 = task::spawn(());
    let mut t3 = task::spawn(());

    let (mut tx1, mut rx) = mpsc::channel(1);
    let mut tx2 = tx1.clone();

    assert_ok!(tx1.try_send(()));

    t1.enter(|cx, _| {
        assert_pending!(tx1.poll_ready(cx));
    });

    t2.enter(|cx, _| {
        assert_pending!(tx2.poll_ready(cx));
    });

    drop(tx1);

    let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
    assert!(val.is_some());

    assert!(t2.is_woken());
    assert!(!t1.is_woken());

    drop(tx2);

    let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
    assert!(val.is_none());
}

#[test]
#[should_panic]
fn buffer_gteq_one() {
    mpsc::channel::<i32>(0);
}

#[test]
fn send_recv_unbounded() {
    let mut t1 = task::spawn(());

    let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

    // Using `try_send`
    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert_eq!(val, Some(1));

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert_eq!(val, Some(2));

    drop(tx);

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert!(val.is_none());
}

#[tokio::test]
async fn async_send_recv_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel();

    tokio::spawn(async move {
        assert_ok!(tx.send(1));
        assert_ok!(tx.send(2));
    });

    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}

#[tokio::test]
async fn send_recv_stream_unbounded() {
    use tokio::stream::StreamExt;

    let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

    tokio::spawn(async move {
        assert_ok!(tx.send(1));
        assert_ok!(tx.send(2));
    });

    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

#[test]
fn no_t_bounds_buffer() {
    struct NoImpls;

    let mut t1 = task::spawn(());
    let (tx, mut rx) = mpsc::channel(100);

    // sender should be Debug even though T isn't Debug
    println!("{:?}", tx);
    // same with Receiver
    println!("{:?}", rx);
    // and sender should be Clone even though T isn't Clone
    assert!(tx.clone().try_send(NoImpls).is_ok());

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert!(val.is_some());
}

#[test]
fn no_t_bounds_unbounded() {
    struct NoImpls;

    let mut t1 = task::spawn(());
    let (tx, mut rx) = mpsc::unbounded_channel();

    // sender should be Debug even though T isn't Debug
    println!("{:?}", tx);
    // same with Receiver
    println!("{:?}", rx);
    // and sender should be Clone even though T isn't Clone
    assert!(tx.clone().send(NoImpls).is_ok());

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert!(val.is_some());
}

#[test]
fn send_recv_buffer_limited() {
    let mut t1 = task::spawn(());
    let mut t2 = task::spawn(());

    let (mut tx, mut rx) = mpsc::channel::<i32>(1);

    // Run on a task context
    t1.enter(|cx, _| {
        assert_ready_ok!(tx.poll_ready(cx));

        // Send first message
        assert_ok!(tx.try_send(1));

        // Not ready
        assert_pending!(tx.poll_ready(cx));

        // Send second message
        assert_err!(tx.try_send(1337));
    });

    t2.enter(|cx, _| {
        // Take the value
        let val = assert_ready!(rx.poll_recv(cx));
        assert_eq!(Some(1), val);
    });

    assert!(t1.is_woken());

    t1.enter(|cx, _| {
        assert_ready_ok!(tx.poll_ready(cx));

        assert_ok!(tx.try_send(2));

        // Not ready
        assert_pending!(tx.poll_ready(cx));
    });

    t2.enter(|cx, _| {
        // Take the value
        let val = assert_ready!(rx.poll_recv(cx));
        assert_eq!(Some(2), val);
    });

    t1.enter(|cx, _| {
        assert_ready_ok!(tx.poll_ready(cx));
    });
}

#[test]
fn recv_close_gets_none_idle() {
    let mut t1 = task::spawn(());

    let (mut tx, mut rx) = mpsc::channel::<i32>(10);

    rx.close();

    t1.enter(|cx, _| {
        let val = assert_ready!(rx.poll_recv(cx));
        assert!(val.is_none());
        assert_ready_err!(tx.poll_ready(cx));
    });
}

#[test]
fn recv_close_gets_none_reserved() {
    let mut t1 = task::spawn(());
    let mut t2 = task::spawn(());
    let mut t3 = task::spawn(());

    let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
    let mut tx2 = tx1.clone();

    assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));

    t2.enter(|cx, _| {
        assert_pending!(tx2.poll_ready(cx));
    });

    rx.close();

    assert!(t2.is_woken());

    t2.enter(|cx, _| {
        assert_ready_err!(tx2.poll_ready(cx));
    });

    t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));

    assert!(!t1.is_woken());
    assert!(!t2.is_woken());

    assert_ok!(tx1.try_send(123));

    assert!(t3.is_woken());

    t3.enter(|cx, _| {
        let v = assert_ready!(rx.poll_recv(cx));
        assert_eq!(v, Some(123));

        let v = assert_ready!(rx.poll_recv(cx));
        assert!(v.is_none());
    });
}

#[test]
fn tx_close_gets_none() {
    let mut t1 = task::spawn(());

    let (_, mut rx) = mpsc::channel::<i32>(10);

    // Run on a task context
    t1.enter(|cx, _| {
        let v = assert_ready!(rx.poll_recv(cx));
        assert!(v.is_none());
    });
}

#[test]
fn try_send_fail() {
    let mut t1 = task::spawn(());

    let (mut tx, mut rx) = mpsc::channel(1);

    tx.try_send("hello").unwrap();

    // This should fail
    match assert_err!(tx.try_send("fail")) {
        TrySendError::Full(..) => {}
        _ => panic!(),
    }

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert_eq!(val, Some("hello"));

    assert_ok!(tx.try_send("goodbye"));
    drop(tx);

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert_eq!(val, Some("goodbye"));

    let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
    assert!(val.is_none());
}

#[test]
fn drop_tx_with_permit_releases_permit() {
    let mut t1 = task::spawn(());
    let mut t2 = task::spawn(());

    // poll_ready reserves capacity, ensure that the capacity is released if tx
    // is dropped w/o sending a value.
    let (mut tx1, _rx) = mpsc::channel::<i32>(1);
    let mut tx2 = tx1.clone();

    assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));

    t2.enter(|cx, _| {
        assert_pending!(tx2.poll_ready(cx));
    });

    drop(tx1);

    assert!(t2.is_woken());

    assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
}

#[test]
fn dropping_rx_closes_channel() {
    let mut t1 = task::spawn(());

    let (mut tx, rx) = mpsc::channel(100);

    let msg = Arc::new(());
    assert_ok!(tx.try_send(msg.clone()));

    drop(rx);
    assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn dropping_rx_closes_channel_for_try() {
    let (mut tx, rx) = mpsc::channel(100);

    let msg = Arc::new(());
    tx.try_send(msg.clone()).unwrap();

    drop(rx);

    {
        let err = assert_err!(tx.try_send(msg.clone()));
        match err {
            TrySendError::Closed(..) => {}
            _ => panic!(),
        }
    }

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn unconsumed_messages_are_dropped() {
    let msg = Arc::new(());

    let (mut tx, rx) = mpsc::channel(100);

    tx.try_send(msg.clone()).unwrap();

    assert_eq!(2, Arc::strong_count(&msg));

    drop((tx, rx));

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn try_recv() {
    let (mut tx, mut rx) = mpsc::channel(1);
    match rx.try_recv() {
        Err(TryRecvError::Empty) => {}
        _ => panic!(),
    }
    tx.try_send(42).unwrap();
    match rx.try_recv() {
        Ok(42) => {}
        _ => panic!(),
    }
    drop(tx);
    match rx.try_recv() {
        Err(TryRecvError::Closed) => {}
        _ => panic!(),
    }
}

#[test]
fn try_recv_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    match rx.try_recv() {
        Err(TryRecvError::Empty) => {}
        _ => panic!(),
    }
    tx.send(42).unwrap();
    match rx.try_recv() {
        Ok(42) => {}
        _ => panic!(),
    }
    drop(tx);
    match rx.try_recv() {
        Err(TryRecvError::Closed) => {}
        _ => panic!(),
    }
}

#[test]
fn ready_close_cancel_bounded() {
    use futures::future::poll_fn;

    let (mut tx, mut rx) = mpsc::channel::<()>(100);
    let _tx2 = tx.clone();

    {
        let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await });
        assert_ready_ok!(ready.poll());
    }

    rx.close();

    let mut recv = task::spawn(async { rx.recv().await });
    assert_pending!(recv.poll());

    drop(tx);

    assert!(recv.is_woken());
}

#[tokio::test]
async fn permit_available_not_acquired_close() {
    use futures::future::poll_fn;

    let (mut tx1, mut rx) = mpsc::channel::<()>(1);
    let mut tx2 = tx1.clone();

    {
        let mut ready = task::spawn(poll_fn(|cx| tx1.poll_ready(cx)));
        assert_ready_ok!(ready.poll());
    }

    let mut ready = task::spawn(poll_fn(|cx| tx2.poll_ready(cx)));
    assert_pending!(ready.poll());

    rx.close();

    drop(tx1);
    assert!(ready.is_woken());

    drop(tx2);
    assert!(rx.recv().await.is_none());
}