Skip to main content

compio_runtime/
time.rs

1//! Utilities for tracking time.
2
3use std::{
4    collections::BTreeMap,
5    error::Error,
6    fmt::Display,
7    future::Future,
8    marker::PhantomData,
9    mem::replace,
10    pin::Pin,
11    task::{Context, Poll, Waker},
12    time::{Duration, Instant},
13};
14
15use futures_util::{FutureExt, select};
16
17use crate::Runtime;
18
19/// Waits until `duration` has elapsed.
20///
21/// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An
22/// asynchronous analog to [`std::thread::sleep`].
23///
24/// To run something regularly on a schedule, see [`interval`].
25///
26/// # Examples
27///
28/// Wait 100ms and print "100 ms have elapsed".
29///
30/// ```
31/// use std::time::Duration;
32///
33/// use compio_runtime::time::sleep;
34///
35/// # compio_runtime::Runtime::new().unwrap().block_on(async {
36/// sleep(Duration::from_millis(100)).await;
37/// println!("100 ms have elapsed");
38/// # })
39/// ```
40pub async fn sleep(duration: Duration) {
41    sleep_until(Instant::now() + duration).await
42}
43
44/// Waits until `deadline` is reached.
45///
46/// To run something regularly on a schedule, see [`interval`].
47///
48/// # Examples
49///
50/// Wait 100ms and print "100 ms have elapsed".
51///
52/// ```
53/// use std::time::{Duration, Instant};
54///
55/// use compio_runtime::time::sleep_until;
56///
57/// # compio_runtime::Runtime::new().unwrap().block_on(async {
58/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
59/// println!("100 ms have elapsed");
60/// # })
61/// ```
62pub async fn sleep_until(deadline: Instant) {
63    crate::create_timer(deadline).await
64}
65
66/// Error returned by [`timeout`] or [`timeout_at`].
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct Elapsed(());
69
70impl Display for Elapsed {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_str("deadline has elapsed")
73    }
74}
75
76impl Error for Elapsed {}
77
78/// Require a [`Future`] to complete before the specified duration has elapsed.
79///
80/// If the future completes before the duration has elapsed, then the completed
81/// value is returned. Otherwise, an error is returned and the future is
82/// cancelled.
83pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, Elapsed> {
84    select! {
85        res = future.fuse() => Ok(res),
86        _ = sleep(duration).fuse() => Err(Elapsed(())),
87    }
88}
89
90/// Require a [`Future`] to complete before the specified instant in time.
91///
92/// If the future completes before the instant is reached, then the completed
93/// value is returned. Otherwise, an error is returned.
94pub async fn timeout_at<F: Future>(deadline: Instant, future: F) -> Result<F::Output, Elapsed> {
95    timeout(deadline - Instant::now(), future).await
96}
97
98/// Interval returned by [`interval`] and [`interval_at`]
99///
100/// This type allows you to wait on a sequence of instants with a certain
101/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
102/// you count the time spent between the calls to [`sleep`] as well.
103#[derive(Debug)]
104pub struct Interval {
105    first_ticked: bool,
106    start: Instant,
107    period: Duration,
108}
109
110impl Interval {
111    pub(crate) fn new(start: Instant, period: Duration) -> Self {
112        Self {
113            first_ticked: false,
114            start,
115            period,
116        }
117    }
118
119    /// Completes when the next instant in the interval has been reached.
120    ///
121    /// See [`interval`] and [`interval_at`].
122    pub async fn tick(&mut self) -> Instant {
123        if !self.first_ticked {
124            sleep_until(self.start).await;
125            self.first_ticked = true;
126            self.start
127        } else {
128            let now = Instant::now();
129            let next = now + self.period
130                - Duration::from_nanos(
131                    ((now - self.start).as_nanos() % self.period.as_nanos()) as _,
132                );
133            sleep_until(next).await;
134            next
135        }
136    }
137}
138
139/// Creates new [`Interval`] that yields with interval of `period`. The first
140/// tick completes immediately.
141///
142/// An interval will tick indefinitely. At any time, the [`Interval`] value can
143/// be dropped. This cancels the interval.
144///
145/// This function is equivalent to
146/// [`interval_at(Instant::now(), period)`](interval_at).
147///
148/// # Panics
149///
150/// This function panics if `period` is zero.
151///
152/// # Examples
153///
154/// ```
155/// use std::time::Duration;
156///
157/// use compio_runtime::time::interval;
158///
159/// # compio_runtime::Runtime::new().unwrap().block_on(async {
160/// let mut interval = interval(Duration::from_millis(10));
161///
162/// interval.tick().await; // ticks immediately
163/// interval.tick().await; // ticks after 10ms
164/// interval.tick().await; // ticks after 10ms
165///
166/// // approximately 20ms have elapsed.
167/// # })
168/// ```
169///
170/// A simple example using [`interval`] to execute a task every two seconds.
171///
172/// The difference between [`interval`] and [`sleep`] is that an [`Interval`]
173/// measures the time since the last tick, which means that [`.tick().await`]
174/// may wait for a shorter time than the duration specified for the interval
175/// if some time has passed between calls to [`.tick().await`].
176///
177/// If the tick in the example below was replaced with [`sleep`], the task
178/// would only be executed once every three seconds, and not every two
179/// seconds.
180///
181/// ```no_run
182/// use std::time::Duration;
183///
184/// use compio_runtime::time::{interval, sleep};
185///
186/// async fn task_that_takes_a_second() {
187///     println!("hello");
188///     sleep(Duration::from_secs(1)).await
189/// }
190///
191/// # compio_runtime::Runtime::new().unwrap().block_on(async {
192/// let mut interval = interval(Duration::from_secs(2));
193/// for _i in 0..5 {
194///     interval.tick().await;
195///     task_that_takes_a_second().await;
196/// }
197/// # })
198/// ```
199///
200/// [`sleep`]: crate::time::sleep()
201/// [`.tick().await`]: Interval::tick
202pub fn interval(period: Duration) -> Interval {
203    interval_at(Instant::now(), period)
204}
205
206/// Creates new [`Interval`] that yields with interval of `period` with the
207/// first tick completing at `start`.
208///
209/// An interval will tick indefinitely. At any time, the [`Interval`] value can
210/// be dropped. This cancels the interval.
211///
212/// # Panics
213///
214/// This function panics if `period` is zero.
215///
216/// # Examples
217///
218/// ```
219/// use std::time::{Duration, Instant};
220///
221/// use compio_runtime::time::interval_at;
222///
223/// # compio_runtime::Runtime::new().unwrap().block_on(async {
224/// let start = Instant::now() + Duration::from_millis(50);
225/// let mut interval = interval_at(start, Duration::from_millis(10));
226///
227/// interval.tick().await; // ticks after 50ms
228/// interval.tick().await; // ticks after 10ms
229/// interval.tick().await; // ticks after 10ms
230///
231/// // approximately 70ms have elapsed.
232/// # });
233/// ```
234pub fn interval_at(start: Instant, period: Duration) -> Interval {
235    assert!(period > Duration::ZERO, "`period` must be non-zero.");
236    Interval::new(start, period)
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
240pub(crate) struct TimerKey {
241    deadline: Instant,
242    key: u64,
243    _local_marker: PhantomData<*const ()>,
244}
245
246pub(crate) struct TimerRuntime {
247    key: u64,
248    wheel: BTreeMap<TimerKey, Waker>,
249}
250
251impl TimerRuntime {
252    pub fn new() -> Self {
253        Self {
254            key: 0,
255            wheel: BTreeMap::default(),
256        }
257    }
258
259    /// Return true if the timer has completed.
260    pub fn is_completed(&self, key: &TimerKey) -> bool {
261        !self.wheel.contains_key(key)
262    }
263
264    /// Insert a new timer. If the deadline is in the past, return `None`.
265    pub fn insert(&mut self, deadline: Instant) -> Option<TimerKey> {
266        if deadline <= Instant::now() {
267            return None;
268        }
269        let key = TimerKey {
270            deadline,
271            key: self.key,
272            _local_marker: PhantomData,
273        };
274        self.wheel.insert(key, Waker::noop().clone());
275
276        self.key += 1;
277
278        Some(key)
279    }
280
281    /// Update the waker for a timer.
282    pub fn update_waker(&mut self, key: &TimerKey, waker: &Waker) {
283        if let Some(w) = self.wheel.get_mut(key)
284            && !waker.will_wake(w)
285        {
286            *w = waker.clone();
287        }
288    }
289
290    /// Cancel a timer.
291    pub fn cancel(&mut self, key: &TimerKey) {
292        self.wheel.remove(key);
293    }
294
295    /// Get the minimum timeout duration for the next poll.
296    pub fn min_timeout(&self) -> Option<Duration> {
297        self.wheel.first_key_value().map(|(key, _)| {
298            let now = Instant::now();
299            key.deadline.saturating_duration_since(now)
300        })
301    }
302
303    /// Wake all the timer futures that have reached their deadline.
304    pub fn wake(&mut self) {
305        if self.wheel.is_empty() {
306            return;
307        }
308
309        let now = Instant::now();
310
311        let pending = self.wheel.split_off(&TimerKey {
312            deadline: now,
313            key: u64::MAX,
314            _local_marker: PhantomData,
315        });
316
317        let expired = replace(&mut self.wheel, pending);
318        for (_, w) in expired {
319            w.wake();
320        }
321    }
322}
323
324pub(crate) struct TimerFuture(TimerKey);
325
326impl TimerFuture {
327    pub fn new(key: TimerKey) -> Self {
328        Self(key)
329    }
330}
331
332impl Future for TimerFuture {
333    type Output = ();
334
335    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
336        Runtime::with_current(|r| r.poll_timer(cx, &self.0))
337    }
338}
339
340impl Drop for TimerFuture {
341    fn drop(&mut self) {
342        Runtime::with_current(|r| r.cancel_timer(&self.0));
343    }
344}
345
346compio_driver::assert_not_impl!(TimerFuture, Send);
347compio_driver::assert_not_impl!(TimerFuture, Sync);
348
349#[test]
350fn timer_min_timeout() {
351    let mut runtime = TimerRuntime::new();
352    assert_eq!(runtime.min_timeout(), None);
353
354    let now = Instant::now();
355    runtime.insert(now + Duration::from_secs(1));
356    runtime.insert(now + Duration::from_secs(10));
357    let min_timeout = runtime.min_timeout().unwrap().as_secs_f32();
358
359    assert!(min_timeout < 1.);
360}