tokio 1.30.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};

// Takes the `Interval` task, `start` variable, and optional time deltas
// For each time delta, it polls the `Interval` and asserts that the result is
// equal to `start` + the specific time delta. Then it asserts that the
// `Interval` is pending.
macro_rules! check_interval_poll {
    ($i:ident, $start:ident, $($delta:expr),*$(,)?) => {
        $(
            assert_ready_eq!(poll_next(&mut $i), $start + ms($delta));
        )*
        assert_pending!(poll_next(&mut $i));
    };
    ($i:ident, $start:ident) => {
        check_interval_poll!($i, $start,);
    };
}

#[tokio::test]
#[should_panic]
async fn interval_zero_duration() {
    let _ = time::interval_at(Instant::now(), ms(0));
}

// Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
// Actual ticks:   | work -----|          delay          | work | work | work -| work -----|
// Poll behavior:  |   |       |                         |      |      |       |           |
//                 |   |       |                         |      |      |       |           |
//          Ready(s)   |       |             Ready(s + 2p)      |      |       |           |
//               Pending       |                    Ready(s + 3p)      |       |           |
//                  Ready(s + p)                           Ready(s + 4p)       |           |
//                                                                 Ready(s + 5p)           |
//                                                                             Ready(s + 6p)
#[tokio::test(start_paused = true)]
async fn burst() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(650)).await;
    check_interval_poll!(i, start, 600, 900);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start, 1200);

    time::advance(ms(250)).await;
    check_interval_poll!(i, start, 1500);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1800);
}

// Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
// Actual ticks:   | work -----|          delay          | work -----| work -----| work -----|
// Poll behavior:  |   |       |                         |   |       |           |           |
//                 |   |       |                         |   |       |           |           |
//          Ready(s)   |       |             Ready(s + 2p)   |       |           |           |
//               Pending       |                       Pending       |           |           |
//                  Ready(s + p)                     Ready(s + 2p + d)           |           |
//                                                               Ready(s + 3p + d)           |
//                                                                           Ready(s + 4p + d)
#[tokio::test(start_paused = true)]
async fn delay() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));
    i.set_missed_tick_behavior(MissedTickBehavior::Delay);

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(650)).await;
    check_interval_poll!(i, start, 600);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    // We have to add one here for the same reason as is above.
    // Because `Interval` has reset its timer according to `Instant::now()`,
    // we have to go forward 1 more millisecond than is expected so that the
    // runtime realizes that it's time to resolve the timer.
    time::advance(ms(201)).await;
    // We add one because when using the `Delay` behavior, `Interval`
    // adds the `period` from `Instant::now()`, which will always be off by one
    // because we have to advance time by 1 (see above).
    check_interval_poll!(i, start, 1251);

    time::advance(ms(300)).await;
    // Again, we add one.
    check_interval_poll!(i, start, 1551);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1851);
}

// Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
// Actual ticks:   | work -----|          delay          | work ---| work -----| work -----|
// Poll behavior:  |   |       |                         |         |           |           |
//                 |   |       |                         |         |           |           |
//          Ready(s)   |       |             Ready(s + 2p)         |           |           |
//               Pending       |                       Ready(s + 4p)           |           |
//                  Ready(s + p)                                   Ready(s + 5p)           |
//                                                                             Ready(s + 6p)
#[tokio::test(start_paused = true)]
async fn skip() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));
    i.set_missed_tick_behavior(MissedTickBehavior::Skip);

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(650)).await;
    check_interval_poll!(i, start, 600);

    time::advance(ms(250)).await;
    check_interval_poll!(i, start, 1200);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1500);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1800);
}

#[tokio::test(start_paused = true)]
async fn reset() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    i.reset();

    time::advance(ms(250)).await;
    check_interval_poll!(i, start);

    time::advance(ms(50)).await;
    // We add one because when using `reset` method, `Interval` adds the
    // `period` from `Instant::now()`, which will always be off by one
    check_interval_poll!(i, start, 701);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1001);
}

#[tokio::test(start_paused = true)]
async fn reset_immediatelly() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    i.reset_immediately();

    // We add one because when using `reset` method, `Interval` adds the
    // `period` from `Instant::now()`, which will always be off by one
    check_interval_poll!(i, start, 401);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 701);
}

#[tokio::test(start_paused = true)]
async fn reset_after() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    i.reset_after(Duration::from_millis(20));

    // We add one because when using `reset` method, `Interval` adds the
    // `period` from `Instant::now()`, which will always be off by one
    time::advance(ms(20)).await;
    check_interval_poll!(i, start, 421);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 721);
}

#[tokio::test(start_paused = true)]
async fn reset_at() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    i.reset_at(Instant::now() + Duration::from_millis(40));

    // We add one because when using `reset` method, `Interval` adds the
    // `period` from `Instant::now()`, which will always be off by one
    time::advance(ms(40)).await;
    check_interval_poll!(i, start, 441);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 741);
}

#[tokio::test(start_paused = true)]
async fn reset_at_bigger_than_interval() {
    let start = Instant::now();

    // This is necessary because the timer is only so granular, and in order for
    // all our ticks to resolve, the time needs to be 1ms ahead of what we
    // expect, so that the runtime will see that it is time to resolve the timer
    time::advance(ms(1)).await;

    let mut i = task::spawn(time::interval_at(start, ms(300)));

    check_interval_poll!(i, start, 0);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    time::advance(ms(200)).await;
    check_interval_poll!(i, start, 300);

    time::advance(ms(100)).await;
    check_interval_poll!(i, start);

    i.reset_at(Instant::now() + Duration::from_millis(1000));

    // Validate the interval does not tick until 1000ms have passed
    time::advance(ms(300)).await;
    check_interval_poll!(i, start);
    time::advance(ms(300)).await;
    check_interval_poll!(i, start);
    time::advance(ms(300)).await;
    check_interval_poll!(i, start);

    // We add one because when using `reset` method, `Interval` adds the
    // `period` from `Instant::now()`, which will always be off by one
    time::advance(ms(100)).await;
    check_interval_poll!(i, start, 1401);

    time::advance(ms(300)).await;
    check_interval_poll!(i, start, 1701);
}

fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
    interval.enter(|cx, mut interval| interval.poll_tick(cx))
}

fn ms(n: u64) -> Duration {
    Duration::from_millis(n)
}

/// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
///
/// `poll_tick()` should register the waker in the context only if it returns
/// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
/// interval timer and counts up on every tick when used as stream. When the
/// counter is a multiple of four, it yields the current counter value.
/// Depending on the value for `wake_on_pending`, it will reschedule itself when
/// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
/// we expect that the stream stalls because the timer will **not** reschedule
/// the next wake-up itself once it returned `Poll::Ready`.
struct IntervalStreamer {
    counter: u32,
    timer: Interval,
    wake_on_pending: bool,
}

impl Stream for IntervalStreamer {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = Pin::into_inner(self);

        if this.counter > 12 {
            return Poll::Ready(None);
        }

        match this.timer.poll_tick(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => {
                this.counter += 1;
                if this.counter % 4 == 0 {
                    Poll::Ready(Some(this.counter))
                } else {
                    if this.wake_on_pending {
                        // Schedule this task for wake-up
                        cx.waker().wake_by_ref();
                    }
                    Poll::Pending
                }
            }
        }
    }
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_self_waking() {
    let stream = IntervalStreamer {
        counter: 0,
        timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
        wake_on_pending: true,
    };

    let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

    // Wrap task in timeout so that it will finish eventually even if the stream
    // stalls.
    tokio::spawn(tokio::time::timeout(
        tokio::time::Duration::from_millis(150),
        async move {
            tokio::pin!(stream);

            while let Some(item) = stream.next().await {
                res_tx.send(item).await.ok();
            }
        },
    ));

    let mut items = Vec::with_capacity(3);
    while let Some(result) = res_rx.recv().await {
        items.push(result);
    }

    // We expect the stream to yield normally and thus three items.
    assert_eq!(items, vec![4, 8, 12]);
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_no_waking() {
    let stream = IntervalStreamer {
        counter: 0,
        timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
        wake_on_pending: false,
    };

    let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

    // Wrap task in timeout so that it will finish eventually even if the stream
    // stalls.
    tokio::spawn(tokio::time::timeout(
        tokio::time::Duration::from_millis(150),
        async move {
            tokio::pin!(stream);

            while let Some(item) = stream.next().await {
                res_tx.send(item).await.ok();
            }
        },
    ));

    let mut items = Vec::with_capacity(0);
    while let Some(result) = res_rx.recv().await {
        items.push(result);
    }

    // We expect the stream to stall because it does not reschedule itself on
    // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
    // task when returning `Poll::Ready`.
    assert_eq!(items, vec![]);
}