Skip to main content

compio_runtime/
time.rs

1//! Utilities for tracking time.
2
3use std::{
4    collections::BTreeMap,
5    error::Error,
6    fmt::Display,
7    future::Future,
8    marker::PhantomData,
9    mem::replace,
10    pin::Pin,
11    task::{Context, Poll, Waker},
12    time::{Duration, Instant},
13};
14
15use compio_log::{debug, instrument};
16use futures_util::{FutureExt, select};
17
18use crate::Runtime;
19
20/// Waits until `duration` has elapsed.
21///
22/// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An
23/// asynchronous analog to [`std::thread::sleep`].
24///
25/// To run something regularly on a schedule, see [`interval`].
26///
27/// # Examples
28///
29/// Wait 100ms and print "100 ms have elapsed".
30///
31/// ```
32/// use std::time::Duration;
33///
34/// use compio_runtime::time::sleep;
35///
36/// # compio_runtime::Runtime::new().unwrap().block_on(async {
37/// sleep(Duration::from_millis(100)).await;
38/// println!("100 ms have elapsed");
39/// # })
40/// ```
41pub async fn sleep(duration: Duration) {
42    sleep_until(Instant::now() + duration).await
43}
44
45/// Waits until `deadline` is reached.
46///
47/// To run something regularly on a schedule, see [`interval`].
48///
49/// # Examples
50///
51/// Wait 100ms and print "100 ms have elapsed".
52///
53/// ```
54/// use std::time::{Duration, Instant};
55///
56/// use compio_runtime::time::sleep_until;
57///
58/// # compio_runtime::Runtime::new().unwrap().block_on(async {
59/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
60/// println!("100 ms have elapsed");
61/// # })
62/// ```
63pub async fn sleep_until(deadline: Instant) {
64    create_timer(deadline).await
65}
66
67async fn create_timer(instant: std::time::Instant) {
68    let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
69    if let Some(key) = key {
70        TimerFuture::new(key).await;
71    }
72}
73
74/// Error returned by [`timeout`] or [`timeout_at`].
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct Elapsed(());
77
78impl Display for Elapsed {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.write_str("deadline has elapsed")
81    }
82}
83
84impl Error for Elapsed {}
85
86/// Require a [`Future`] to complete before the specified duration has elapsed.
87///
88/// If the future completes before the duration has elapsed, then the completed
89/// value is returned. Otherwise, an error is returned and the future is
90/// cancelled.
91pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, Elapsed> {
92    select! {
93        res = future.fuse() => Ok(res),
94        _ = sleep(duration).fuse() => Err(Elapsed(())),
95    }
96}
97
98/// Require a [`Future`] to complete before the specified instant in time.
99///
100/// If the future completes before the instant is reached, then the completed
101/// value is returned. Otherwise, an error is returned.
102pub async fn timeout_at<F: Future>(deadline: Instant, future: F) -> Result<F::Output, Elapsed> {
103    timeout(deadline - Instant::now(), future).await
104}
105
106/// Interval returned by [`interval`] and [`interval_at`]
107///
108/// This type allows you to wait on a sequence of instants with a certain
109/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
110/// you count the time spent between the calls to [`sleep`] as well.
111#[derive(Debug)]
112pub struct Interval {
113    first_ticked: bool,
114    start: Instant,
115    period: Duration,
116}
117
118impl Interval {
119    pub(crate) fn new(start: Instant, period: Duration) -> Self {
120        Self {
121            first_ticked: false,
122            start,
123            period,
124        }
125    }
126
127    /// Completes when the next instant in the interval has been reached.
128    ///
129    /// See [`interval`] and [`interval_at`].
130    pub async fn tick(&mut self) -> Instant {
131        if !self.first_ticked {
132            sleep_until(self.start).await;
133            self.first_ticked = true;
134            self.start
135        } else {
136            let now = Instant::now();
137            let next = now + self.period
138                - Duration::from_nanos(
139                    ((now - self.start).as_nanos() % self.period.as_nanos()) as _,
140                );
141            sleep_until(next).await;
142            next
143        }
144    }
145}
146
147/// Creates new [`Interval`] that yields with interval of `period`. The first
148/// tick completes immediately.
149///
150/// An interval will tick indefinitely. At any time, the [`Interval`] value can
151/// be dropped. This cancels the interval.
152///
153/// This function is equivalent to
154/// [`interval_at(Instant::now(), period)`](interval_at).
155///
156/// # Panics
157///
158/// This function panics if `period` is zero.
159///
160/// # Examples
161///
162/// ```
163/// use std::time::Duration;
164///
165/// use compio_runtime::time::interval;
166///
167/// # compio_runtime::Runtime::new().unwrap().block_on(async {
168/// let mut interval = interval(Duration::from_millis(10));
169///
170/// interval.tick().await; // ticks immediately
171/// interval.tick().await; // ticks after 10ms
172/// interval.tick().await; // ticks after 10ms
173///
174/// // approximately 20ms have elapsed.
175/// # })
176/// ```
177///
178/// A simple example using [`interval`] to execute a task every two seconds.
179///
180/// The difference between [`interval`] and [`sleep`] is that an [`Interval`]
181/// measures the time since the last tick, which means that [`.tick().await`]
182/// may wait for a shorter time than the duration specified for the interval
183/// if some time has passed between calls to [`.tick().await`].
184///
185/// If the tick in the example below was replaced with [`sleep`], the task
186/// would only be executed once every three seconds, and not every two
187/// seconds.
188///
189/// ```no_run
190/// use std::time::Duration;
191///
192/// use compio_runtime::time::{interval, sleep};
193///
194/// async fn task_that_takes_a_second() {
195///     println!("hello");
196///     sleep(Duration::from_secs(1)).await
197/// }
198///
199/// # compio_runtime::Runtime::new().unwrap().block_on(async {
200/// let mut interval = interval(Duration::from_secs(2));
201/// for _i in 0..5 {
202///     interval.tick().await;
203///     task_that_takes_a_second().await;
204/// }
205/// # })
206/// ```
207///
208/// [`sleep`]: crate::time::sleep()
209/// [`.tick().await`]: Interval::tick
210pub fn interval(period: Duration) -> Interval {
211    interval_at(Instant::now(), period)
212}
213
214/// Creates new [`Interval`] that yields with interval of `period` with the
215/// first tick completing at `start`.
216///
217/// An interval will tick indefinitely. At any time, the [`Interval`] value can
218/// be dropped. This cancels the interval.
219///
220/// # Panics
221///
222/// This function panics if `period` is zero.
223///
224/// # Examples
225///
226/// ```
227/// use std::time::{Duration, Instant};
228///
229/// use compio_runtime::time::interval_at;
230///
231/// # compio_runtime::Runtime::new().unwrap().block_on(async {
232/// let start = Instant::now() + Duration::from_millis(50);
233/// let mut interval = interval_at(start, Duration::from_millis(10));
234///
235/// interval.tick().await; // ticks after 50ms
236/// interval.tick().await; // ticks after 10ms
237/// interval.tick().await; // ticks after 10ms
238///
239/// // approximately 70ms have elapsed.
240/// # });
241/// ```
242pub fn interval_at(start: Instant, period: Duration) -> Interval {
243    assert!(period > Duration::ZERO, "`period` must be non-zero.");
244    Interval::new(start, period)
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
248pub(crate) struct TimerKey {
249    deadline: Instant,
250    key: u64,
251    _local_marker: PhantomData<*const ()>,
252}
253
254pub(crate) struct TimerRuntime {
255    key: u64,
256    wheel: BTreeMap<TimerKey, Waker>,
257}
258
259impl TimerRuntime {
260    pub fn new() -> Self {
261        Self {
262            key: 0,
263            wheel: BTreeMap::default(),
264        }
265    }
266
267    /// Return true if the timer has completed.
268    pub fn is_completed(&self, key: &TimerKey) -> bool {
269        !self.wheel.contains_key(key)
270    }
271
272    /// Insert a new timer. If the deadline is in the past, return `None`.
273    pub fn insert(&mut self, deadline: Instant) -> Option<TimerKey> {
274        if deadline <= Instant::now() {
275            return None;
276        }
277        let key = TimerKey {
278            deadline,
279            key: self.key,
280            _local_marker: PhantomData,
281        };
282        self.wheel.insert(key, Waker::noop().clone());
283
284        self.key += 1;
285
286        Some(key)
287    }
288
289    /// Update the waker for a timer.
290    pub fn update_waker(&mut self, key: &TimerKey, waker: &Waker) {
291        if let Some(w) = self.wheel.get_mut(key)
292            && !waker.will_wake(w)
293        {
294            *w = waker.clone();
295        }
296    }
297
298    /// Cancel a timer.
299    pub fn cancel(&mut self, key: &TimerKey) {
300        self.wheel.remove(key);
301    }
302
303    /// Get the minimum timeout duration for the next poll.
304    pub fn min_timeout(&self) -> Option<Duration> {
305        self.wheel.first_key_value().map(|(key, _)| {
306            let now = Instant::now();
307            key.deadline.saturating_duration_since(now)
308        })
309    }
310
311    /// Wake all the timer futures that have reached their deadline.
312    pub fn wake(&mut self) {
313        if self.wheel.is_empty() {
314            return;
315        }
316
317        let now = Instant::now();
318
319        let pending = self.wheel.split_off(&TimerKey {
320            deadline: now,
321            key: u64::MAX,
322            _local_marker: PhantomData,
323        });
324
325        let expired = replace(&mut self.wheel, pending);
326        for (_, w) in expired {
327            w.wake();
328        }
329    }
330
331    pub fn poll_timer(&mut self, cx: &mut Context<'_>, key: &TimerKey) -> Poll<()> {
332        instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
333        if self.is_completed(key) {
334            debug!("ready");
335            Poll::Ready(())
336        } else {
337            debug!("pending");
338            self.update_waker(key, cx.waker());
339            Poll::Pending
340        }
341    }
342}
343
344pub(crate) struct TimerFuture(TimerKey);
345
346impl TimerFuture {
347    pub fn new(key: TimerKey) -> Self {
348        Self(key)
349    }
350}
351
352impl Future for TimerFuture {
353    type Output = ();
354
355    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
356        Runtime::with_current(|r| r.timer_runtime.borrow_mut().poll_timer(cx, &self.0))
357    }
358}
359
360impl Drop for TimerFuture {
361    fn drop(&mut self) {
362        Runtime::with_current(|r| r.timer_runtime.borrow_mut().cancel(&self.0));
363    }
364}
365
366compio_driver::assert_not_impl!(TimerFuture, Send);
367compio_driver::assert_not_impl!(TimerFuture, Sync);
368
369#[test]
370fn timer_min_timeout() {
371    let mut runtime = TimerRuntime::new();
372    assert_eq!(runtime.min_timeout(), None);
373
374    let now = Instant::now();
375    runtime.insert(now + Duration::from_secs(1));
376    runtime.insert(now + Duration::from_secs(10));
377    let min_timeout = runtime.min_timeout().unwrap().as_secs_f32();
378
379    assert!(min_timeout < 1.);
380}