1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! Implements a wrapper around non-async functions, allowing
//! them to be used within an async context.

use futures::{
    future::{FusedFuture, FutureExt},
    pin_mut, select,
};
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll, Waker},
    time::Duration,
};

/// Implements a future, using a caller-supplied polling function.
struct IntervalFuture<O, F>
where
    F: FnMut() -> Poll<O> + Unpin,
{
    f: F,
    completed: bool,
    waker: Option<Waker>,
}

/// Creates a new future which wraps a synchronous function `f`
/// that may be called repeatedly on `interval_period` until
/// it completes.
///
/// Generally, when using futures, an explicit waker should
/// be used to cause re-polling on a more precise basis, but that
/// isn't always available. This method allows callers to create
/// a future which will automatically be-checked with a specified
/// regularity.
///
/// # Arguments
/// - `f`: A function to be polled. Returns [`Poll::Ready`] with
/// a result to complete the future, and [`Poll::Pending`] otherwise.
/// `f` will not be re-invoked after completing.
/// - `interval_period`: A duration of time to wait before
/// re-invoking `f`. This is a *suggestion*, as the function may
/// actually be re-polled more frequently.
///
/// # Example
///
/// ```
/// use std::task::Poll;
/// use std::time::{Duration, Instant};
///
/// #[tokio::main]
/// async fn main() {
///     let time_to_complete = Duration::from_secs(1);
///     let interval = Duration::from_millis(200);
///     let timeout = Duration::from_secs(2);
///
///     let poll_start = Instant::now();
///     let f = || {
///         let elapsed = Instant::now().duration_since(poll_start);
///         if elapsed > time_to_complete {
///             println!("Ready");
///             Poll::Ready(5)
///         } else {
///             println!("Not ready after {} ms", elapsed.as_millis());
///             Poll::Pending
///         }
///     };
///
///     let fut = interval_future::to_future(f, interval);
///     let val = tokio::time::timeout(timeout, fut).await.unwrap();
///     println!("Got my value: {}", val);
/// }
/// ```
pub async fn to_future<O, F>(f: F, interval_period: Duration) -> O
where
    F: FnMut() -> Poll<O> + Unpin,
{
    let fut = IntervalFuture {
        f,
        completed: false,
        waker: None,
    };
    pin_mut!(fut);

    let mut interval_fut = tokio::time::interval(interval_period);

    // First tick completes immediately.
    interval_fut.tick().await;
    loop {
        select! {
            o = fut => return o,
            _ = interval_fut.tick().fuse() => (),
        }
        if let Some(waker) = fut.waker.take() {
            waker.wake();
        }
    }
}

impl<O, F> Future for IntervalFuture<O, F>
where
    F: FnMut() -> Poll<O> + Unpin,
{
    type Output = O;
    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
        let fut = self.get_mut();
        match (fut.f)() {
            Poll::Ready(o) => {
                fut.completed = true;
                Poll::Ready(o)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<O, F> FusedFuture for IntervalFuture<O, F>
where
    F: FnMut() -> Poll<O> + Unpin,
{
    fn is_terminated(&self) -> bool {
        self.completed
    }
}