async-watch2 0.1.0

Async watch channel
Documentation
use async_watch2::channel;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

// Tests are copied and adapted from [`tokio/tests/sync_watch.rs`](https://github.com/tokio-rs/tokio/blob/4c645866ef4ea5b0ef8c7852281a09b2f96d969b/tokio/tests/sync_watch.rs).

#[test]
fn single_rx_recv() {
    let (tx, mut rx) = channel("one");

    {
        let mut t = spawn(rx.recv());
        let v = assert_ready!(t.poll()).unwrap();
        assert_eq!(v, "one");
    }

    {
        let mut t = spawn(rx.recv());

        assert_pending!(t.poll());

        tx.broadcast("two").unwrap();

        assert!(t.is_woken());

        let v = assert_ready!(t.poll()).unwrap();
        assert_eq!(v, "two");
    }

    {
        let mut t = spawn(rx.recv());

        assert_pending!(t.poll());

        drop(tx);

        let res = assert_ready!(t.poll());
        assert!(res.is_none());
    }
}

#[test]
fn multi_rx() {
    let (tx, mut rx1) = channel("one");
    let mut rx2 = rx1.clone();

    {
        let mut t1 = spawn(rx1.recv());
        let mut t2 = spawn(rx2.recv());

        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "one");

        let res = assert_ready!(t2.poll());
        assert_eq!(res.unwrap(), "one");
    }

    let mut t2 = spawn(rx2.recv());

    {
        let mut t1 = spawn(rx1.recv());

        assert_pending!(t1.poll());
        assert_pending!(t2.poll());

        tx.broadcast("two").unwrap();

        assert!(t1.is_woken());
        assert!(t2.is_woken());

        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "two");
    }

    {
        let mut t1 = spawn(rx1.recv());

        assert_pending!(t1.poll());

        tx.broadcast("three").unwrap();

        assert!(t1.is_woken());
        assert!(t2.is_woken());

        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "three");

        let res = assert_ready!(t2.poll());
        assert_eq!(res.unwrap(), "three");
    }

    drop(t2);

    {
        let mut t1 = spawn(rx1.recv());
        let mut t2 = spawn(rx2.recv());

        assert_pending!(t1.poll());
        assert_pending!(t2.poll());

        tx.broadcast("four").unwrap();

        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "four");
        drop(t1);

        let mut t1 = spawn(rx1.recv());
        assert_pending!(t1.poll());

        drop(tx);

        assert!(t1.is_woken());
        let res = assert_ready!(t1.poll());
        assert!(res.is_none());

        let res = assert_ready!(t2.poll());
        assert_eq!(res.unwrap(), "four");

        drop(t2);
        let mut t2 = spawn(rx2.recv());
        let res = assert_ready!(t2.poll());
        assert!(res.is_none());
    }
}

#[test]
fn rx_observes_final_value() {
    // Initial value

    let (tx, mut rx) = channel("one");
    drop(tx);

    {
        let mut t1 = spawn(rx.recv());
        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "one");
    }

    {
        let mut t1 = spawn(rx.recv());
        let res = assert_ready!(t1.poll());
        assert!(res.is_none());
    }

    // Sending a value

    let (tx, mut rx) = channel("one");

    tx.broadcast("two").unwrap();

    {
        let mut t1 = spawn(rx.recv());
        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "two");
    }

    {
        let mut t1 = spawn(rx.recv());
        assert_pending!(t1.poll());

        tx.broadcast("three").unwrap();
        drop(tx);

        assert!(t1.is_woken());

        let res = assert_ready!(t1.poll());
        assert_eq!(res.unwrap(), "three");
    }

    {
        let mut t1 = spawn(rx.recv());
        let res = assert_ready!(t1.poll());
        assert!(res.is_none());
    }
}

#[test]
fn poll_close() {
    let (mut tx, rx) = channel("one");

    {
        let mut t = spawn(tx.closed());
        assert_pending!(t.poll());

        drop(rx);

        assert!(t.is_woken());
        assert_ready!(t.poll());
    }

    assert!(tx.broadcast("two").is_err());
}