tokio 0.3.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![allow(clippy::cognitive_complexity)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

use tokio::sync::broadcast;
use tokio_test::task;
use tokio_test::{
    assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};

use std::sync::Arc;

macro_rules! assert_recv {
    ($e:expr) => {
        match $e.try_recv() {
            Ok(value) => value,
            Err(e) => panic!("expected recv; got = {:?}", e),
        }
    };
}

macro_rules! assert_empty {
    ($e:expr) => {
        match $e.try_recv() {
            Ok(value) => panic!("expected empty; got = {:?}", value),
            Err(broadcast::error::TryRecvError::Empty) => {}
            Err(e) => panic!("expected empty; got = {:?}", e),
        }
    };
}

macro_rules! assert_lagged {
    ($e:expr, $n:expr) => {
        match assert_err!($e) {
            broadcast::error::TryRecvError::Lagged(n) => {
                assert_eq!(n, $n);
            }
            _ => panic!("did not lag"),
        }
    };
}

macro_rules! assert_closed {
    ($e:expr) => {
        match assert_err!($e) {
            broadcast::error::TryRecvError::Closed => {}
            _ => panic!("did not lag"),
        }
    };
}

trait AssertSend: Send + Sync {}
impl AssertSend for broadcast::Sender<i32> {}
impl AssertSend for broadcast::Receiver<i32> {}

#[test]
fn send_try_recv_bounded() {
    let (tx, mut rx) = broadcast::channel(16);

    assert_empty!(rx);

    let n = assert_ok!(tx.send("hello"));
    assert_eq!(n, 1);

    let val = assert_recv!(rx);
    assert_eq!(val, "hello");

    assert_empty!(rx);
}

#[test]
fn send_two_recv() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    assert_empty!(rx1);
    assert_empty!(rx2);

    let n = assert_ok!(tx.send("hello"));
    assert_eq!(n, 2);

    let val = assert_recv!(rx1);
    assert_eq!(val, "hello");

    let val = assert_recv!(rx2);
    assert_eq!(val, "hello");

    assert_empty!(rx1);
    assert_empty!(rx2);
}

#[tokio::test]
async fn send_recv_into_stream_ready() {
    use tokio::stream::StreamExt;

    let (tx, rx) = broadcast::channel::<i32>(8);
    tokio::pin! {
        let rx = rx.into_stream();
    }

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));

    assert_eq!(Some(Ok(1)), rx.next().await);
    assert_eq!(Some(Ok(2)), rx.next().await);

    drop(tx);

    assert_eq!(None, rx.next().await);
}

#[tokio::test]
async fn send_recv_into_stream_pending() {
    use tokio::stream::StreamExt;

    let (tx, rx) = broadcast::channel::<i32>(8);

    tokio::pin! {
        let rx = rx.into_stream();
    }

    let mut recv = task::spawn(rx.next());
    assert_pending!(recv.poll());

    assert_ok!(tx.send(1));

    assert!(recv.is_woken());
    let val = assert_ready!(recv.poll());
    assert_eq!(val, Some(Ok(1)));
}

#[test]
fn send_recv_bounded() {
    let (tx, mut rx) = broadcast::channel(16);

    let mut recv = task::spawn(rx.recv());

    assert_pending!(recv.poll());

    assert_ok!(tx.send("hello"));

    assert!(recv.is_woken());
    let val = assert_ready_ok!(recv.poll());
    assert_eq!(val, "hello");
}

#[test]
fn send_two_recv_bounded() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    let mut recv1 = task::spawn(rx1.recv());
    let mut recv2 = task::spawn(rx2.recv());

    assert_pending!(recv1.poll());
    assert_pending!(recv2.poll());

    assert_ok!(tx.send("hello"));

    assert!(recv1.is_woken());
    assert!(recv2.is_woken());

    let val1 = assert_ready_ok!(recv1.poll());
    let val2 = assert_ready_ok!(recv2.poll());
    assert_eq!(val1, "hello");
    assert_eq!(val2, "hello");

    drop((recv1, recv2));

    let mut recv1 = task::spawn(rx1.recv());
    let mut recv2 = task::spawn(rx2.recv());

    assert_pending!(recv1.poll());

    assert_ok!(tx.send("world"));

    assert!(recv1.is_woken());
    assert!(!recv2.is_woken());

    let val1 = assert_ready_ok!(recv1.poll());
    let val2 = assert_ready_ok!(recv2.poll());
    assert_eq!(val1, "world");
    assert_eq!(val2, "world");
}

#[test]
fn change_tasks() {
    let (tx, mut rx) = broadcast::channel(1);

    let mut recv = Box::pin(rx.recv());

    let mut task1 = task::spawn(&mut recv);
    assert_pending!(task1.poll());

    let mut task2 = task::spawn(&mut recv);
    assert_pending!(task2.poll());

    tx.send("hello").unwrap();

    assert!(task2.is_woken());
}

#[test]
fn send_slow_rx() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    {
        let mut recv2 = task::spawn(rx2.recv());

        {
            let mut recv1 = task::spawn(rx1.recv());

            assert_pending!(recv1.poll());
            assert_pending!(recv2.poll());

            assert_ok!(tx.send("one"));

            assert!(recv1.is_woken());
            assert!(recv2.is_woken());

            assert_ok!(tx.send("two"));

            let val = assert_ready_ok!(recv1.poll());
            assert_eq!(val, "one");
        }

        let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
        assert_eq!(val, "two");

        let mut recv1 = task::spawn(rx1.recv());

        assert_pending!(recv1.poll());

        assert_ok!(tx.send("three"));

        assert!(recv1.is_woken());

        let val = assert_ready_ok!(recv1.poll());
        assert_eq!(val, "three");

        let val = assert_ready_ok!(recv2.poll());
        assert_eq!(val, "one");
    }

    let val = assert_recv!(rx2);
    assert_eq!(val, "two");

    let val = assert_recv!(rx2);
    assert_eq!(val, "three");
}

#[test]
fn drop_rx_while_values_remain() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    assert_ok!(tx.send("one"));
    assert_ok!(tx.send("two"));

    assert_recv!(rx1);
    assert_recv!(rx2);

    drop(rx2);
    drop(rx1);
}

#[test]
fn lagging_rx() {
    let (tx, mut rx1) = broadcast::channel(2);
    let mut rx2 = tx.subscribe();

    assert_ok!(tx.send("one"));
    assert_ok!(tx.send("two"));

    assert_eq!("one", assert_recv!(rx1));

    assert_ok!(tx.send("three"));

    // Lagged too far
    let x = dbg!(rx2.try_recv());
    assert_lagged!(x, 1);

    // Calling again gets the next value
    assert_eq!("two", assert_recv!(rx2));

    assert_eq!("two", assert_recv!(rx1));
    assert_eq!("three", assert_recv!(rx1));

    assert_ok!(tx.send("four"));
    assert_ok!(tx.send("five"));

    assert_lagged!(rx2.try_recv(), 1);

    assert_ok!(tx.send("six"));

    assert_lagged!(rx2.try_recv(), 1);
}

#[test]
fn send_no_rx() {
    let (tx, _) = broadcast::channel(16);

    assert_err!(tx.send("hello"));

    let mut rx = tx.subscribe();

    assert_ok!(tx.send("world"));

    let val = assert_recv!(rx);
    assert_eq!("world", val);
}

#[test]
#[should_panic]
fn zero_capacity() {
    broadcast::channel::<()>(0);
}

#[test]
#[should_panic]
fn capacity_too_big() {
    use std::usize;

    broadcast::channel::<()>(1 + (usize::MAX >> 1));
}

#[test]
fn panic_in_clone() {
    use std::panic::{self, AssertUnwindSafe};

    #[derive(Eq, PartialEq, Debug)]
    struct MyVal(usize);

    impl Clone for MyVal {
        fn clone(&self) -> MyVal {
            assert_ne!(0, self.0);
            MyVal(self.0)
        }
    }

    let (tx, mut rx) = broadcast::channel(16);

    assert_ok!(tx.send(MyVal(0)));
    assert_ok!(tx.send(MyVal(1)));

    let res = panic::catch_unwind(AssertUnwindSafe(|| {
        let _ = rx.try_recv();
    }));

    assert_err!(res);

    let val = assert_recv!(rx);
    assert_eq!(val, MyVal(1));
}

#[test]
fn dropping_tx_notifies_rx() {
    let (tx, mut rx1) = broadcast::channel::<()>(16);
    let mut rx2 = tx.subscribe();

    let tx2 = tx.clone();

    let mut recv1 = task::spawn(rx1.recv());
    let mut recv2 = task::spawn(rx2.recv());

    assert_pending!(recv1.poll());
    assert_pending!(recv2.poll());

    drop(tx);

    assert_pending!(recv1.poll());
    assert_pending!(recv2.poll());

    drop(tx2);

    assert!(recv1.is_woken());
    assert!(recv2.is_woken());

    let err = assert_ready_err!(recv1.poll());
    assert!(is_closed(err));

    let err = assert_ready_err!(recv2.poll());
    assert!(is_closed(err));
}

#[test]
fn unconsumed_messages_are_dropped() {
    let (tx, rx) = broadcast::channel(16);

    let msg = Arc::new(());

    assert_ok!(tx.send(msg.clone()));

    assert_eq!(2, Arc::strong_count(&msg));

    drop(rx);

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn single_capacity_recvs() {
    let (tx, mut rx) = broadcast::channel(1);

    assert_ok!(tx.send(1));

    assert_eq!(assert_recv!(rx), 1);
    assert_empty!(rx);
}

#[test]
fn single_capacity_recvs_after_drop_1() {
    let (tx, mut rx) = broadcast::channel(1);

    assert_ok!(tx.send(1));
    drop(tx);

    assert_eq!(assert_recv!(rx), 1);
    assert_closed!(rx.try_recv());
}

#[test]
fn single_capacity_recvs_after_drop_2() {
    let (tx, mut rx) = broadcast::channel(1);

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));
    drop(tx);

    assert_lagged!(rx.try_recv(), 1);
    assert_eq!(assert_recv!(rx), 2);
    assert_closed!(rx.try_recv());
}

#[test]
fn dropping_sender_does_not_overwrite() {
    let (tx, mut rx) = broadcast::channel(2);

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));
    drop(tx);

    assert_eq!(assert_recv!(rx), 1);
    assert_eq!(assert_recv!(rx), 2);
    assert_closed!(rx.try_recv());
}

#[test]
fn lagging_receiver_recovers_after_wrap_closed_1() {
    let (tx, mut rx) = broadcast::channel(2);

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));
    assert_ok!(tx.send(3));
    drop(tx);

    assert_lagged!(rx.try_recv(), 1);
    assert_eq!(assert_recv!(rx), 2);
    assert_eq!(assert_recv!(rx), 3);
    assert_closed!(rx.try_recv());
}

#[test]
fn lagging_receiver_recovers_after_wrap_closed_2() {
    let (tx, mut rx) = broadcast::channel(2);

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));
    assert_ok!(tx.send(3));
    assert_ok!(tx.send(4));
    drop(tx);

    assert_lagged!(rx.try_recv(), 2);
    assert_eq!(assert_recv!(rx), 3);
    assert_eq!(assert_recv!(rx), 4);
    assert_closed!(rx.try_recv());
}

#[test]
fn lagging_receiver_recovers_after_wrap_open() {
    let (tx, mut rx) = broadcast::channel(2);

    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));
    assert_ok!(tx.send(3));

    assert_lagged!(rx.try_recv(), 1);
    assert_eq!(assert_recv!(rx), 2);
    assert_eq!(assert_recv!(rx), 3);
    assert_empty!(rx);
}

fn is_closed(err: broadcast::error::RecvError) -> bool {
    matches!(err, broadcast::error::RecvError::Closed)
}