tokio-util 0.7.2

Additional utilities for working with Tokio.
Documentation
#![warn(rust_2018_idioms)]

use tokio::pin;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};

use core::future::Future;
use core::task::{Context, Poll};
use futures_test::task::new_count_waker;

#[test]
fn cancel_token() {
    let (waker, wake_counter) = new_count_waker();
    let token = CancellationToken::new();
    assert!(!token.is_cancelled());

    let wait_fut = token.cancelled();
    pin!(wait_fut);

    assert_eq!(
        Poll::Pending,
        wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 0);

    let wait_fut_2 = token.cancelled();
    pin!(wait_fut_2);

    token.cancel();
    assert_eq!(wake_counter, 1);
    assert!(token.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
    );
}

#[test]
fn cancel_child_token_through_parent() {
    let (waker, wake_counter) = new_count_waker();
    let token = CancellationToken::new();

    let child_token = token.child_token();
    assert!(!child_token.is_cancelled());

    let child_fut = child_token.cancelled();
    pin!(child_fut);
    let parent_fut = token.cancelled();
    pin!(parent_fut);

    assert_eq!(
        Poll::Pending,
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 0);

    token.cancel();
    assert_eq!(wake_counter, 2);
    assert!(token.is_cancelled());
    assert!(child_token.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
}

#[test]
fn cancel_grandchild_token_through_parent_if_child_was_dropped() {
    let (waker, wake_counter) = new_count_waker();
    let token = CancellationToken::new();

    let intermediate_token = token.child_token();
    let child_token = intermediate_token.child_token();
    drop(intermediate_token);
    assert!(!child_token.is_cancelled());

    let child_fut = child_token.cancelled();
    pin!(child_fut);
    let parent_fut = token.cancelled();
    pin!(parent_fut);

    assert_eq!(
        Poll::Pending,
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 0);

    token.cancel();
    assert_eq!(wake_counter, 2);
    assert!(token.is_cancelled());
    assert!(child_token.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
}

#[test]
fn cancel_child_token_without_parent() {
    let (waker, wake_counter) = new_count_waker();
    let token = CancellationToken::new();

    let child_token_1 = token.child_token();

    let child_fut = child_token_1.cancelled();
    pin!(child_fut);
    let parent_fut = token.cancelled();
    pin!(parent_fut);

    assert_eq!(
        Poll::Pending,
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 0);

    child_token_1.cancel();
    assert_eq!(wake_counter, 1);
    assert!(!token.is_cancelled());
    assert!(child_token_1.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        child_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );

    let child_token_2 = token.child_token();
    let child_fut_2 = child_token_2.cancelled();
    pin!(child_fut_2);

    assert_eq!(
        Poll::Pending,
        child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );

    token.cancel();
    assert_eq!(wake_counter, 3);
    assert!(token.is_cancelled());
    assert!(child_token_2.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
}

#[test]
fn create_child_token_after_parent_was_cancelled() {
    for drop_child_first in [true, false].iter().cloned() {
        let (waker, wake_counter) = new_count_waker();
        let token = CancellationToken::new();
        token.cancel();

        let child_token = token.child_token();
        assert!(child_token.is_cancelled());

        {
            let child_fut = child_token.cancelled();
            pin!(child_fut);
            let parent_fut = token.cancelled();
            pin!(parent_fut);

            assert_eq!(
                Poll::Ready(()),
                child_fut.as_mut().poll(&mut Context::from_waker(&waker))
            );
            assert_eq!(
                Poll::Ready(()),
                parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
            );
            assert_eq!(wake_counter, 0);

            drop(child_fut);
            drop(parent_fut);
        }

        if drop_child_first {
            drop(child_token);
            drop(token);
        } else {
            drop(token);
            drop(child_token);
        }
    }
}

#[test]
fn drop_multiple_child_tokens() {
    for drop_first_child_first in &[true, false] {
        let token = CancellationToken::new();
        let mut child_tokens = [None, None, None];
        for child in &mut child_tokens {
            *child = Some(token.child_token());
        }

        assert!(!token.is_cancelled());
        assert!(!child_tokens[0].as_ref().unwrap().is_cancelled());

        for i in 0..child_tokens.len() {
            if *drop_first_child_first {
                child_tokens[i] = None;
            } else {
                child_tokens[child_tokens.len() - 1 - i] = None;
            }
            assert!(!token.is_cancelled());
        }

        drop(token);
    }
}

#[test]
fn cancel_only_all_descendants() {
    // ARRANGE
    let (waker, wake_counter) = new_count_waker();

    let parent_token = CancellationToken::new();
    let token = parent_token.child_token();
    let sibling_token = parent_token.child_token();
    let child1_token = token.child_token();
    let child2_token = token.child_token();
    let grandchild_token = child1_token.child_token();
    let grandchild2_token = child1_token.child_token();
    let grandgrandchild_token = grandchild_token.child_token();

    assert!(!parent_token.is_cancelled());
    assert!(!token.is_cancelled());
    assert!(!sibling_token.is_cancelled());
    assert!(!child1_token.is_cancelled());
    assert!(!child2_token.is_cancelled());
    assert!(!grandchild_token.is_cancelled());
    assert!(!grandchild2_token.is_cancelled());
    assert!(!grandgrandchild_token.is_cancelled());

    let parent_fut = parent_token.cancelled();
    let fut = token.cancelled();
    let sibling_fut = sibling_token.cancelled();
    let child1_fut = child1_token.cancelled();
    let child2_fut = child2_token.cancelled();
    let grandchild_fut = grandchild_token.cancelled();
    let grandchild2_fut = grandchild2_token.cancelled();
    let grandgrandchild_fut = grandgrandchild_token.cancelled();

    pin!(parent_fut);
    pin!(fut);
    pin!(sibling_fut);
    pin!(child1_fut);
    pin!(child2_fut);
    pin!(grandchild_fut);
    pin!(grandchild2_fut);
    pin!(grandgrandchild_fut);

    assert_eq!(
        Poll::Pending,
        parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        sibling_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        grandchild_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        grandchild2_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Pending,
        grandgrandchild_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 0);

    // ACT
    token.cancel();

    // ASSERT
    assert_eq!(wake_counter, 6);
    assert!(!parent_token.is_cancelled());
    assert!(token.is_cancelled());
    assert!(!sibling_token.is_cancelled());
    assert!(child1_token.is_cancelled());
    assert!(child2_token.is_cancelled());
    assert!(grandchild_token.is_cancelled());
    assert!(grandchild2_token.is_cancelled());
    assert!(grandgrandchild_token.is_cancelled());

    assert_eq!(
        Poll::Ready(()),
        fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        grandchild_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        grandchild2_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(
        Poll::Ready(()),
        grandgrandchild_fut
            .as_mut()
            .poll(&mut Context::from_waker(&waker))
    );
    assert_eq!(wake_counter, 6);
}

#[test]
fn drop_parent_before_child_tokens() {
    let token = CancellationToken::new();
    let child1 = token.child_token();
    let child2 = token.child_token();

    drop(token);
    assert!(!child1.is_cancelled());

    drop(child1);
    drop(child2);
}

#[test]
fn derives_send_sync() {
    fn assert_send<T: Send>() {}
    fn assert_sync<T: Sync>() {}

    assert_send::<CancellationToken>();
    assert_sync::<CancellationToken>();

    assert_send::<WaitForCancellationFuture<'static>>();
    assert_sync::<WaitForCancellationFuture<'static>>();
}