compio_runtime/time.rs
1//! Utilities for tracking time.
2
3use std::{
4 collections::BTreeMap,
5 error::Error,
6 fmt::Display,
7 future::Future,
8 marker::PhantomData,
9 mem::replace,
10 pin::Pin,
11 task::{Context, Poll, Waker},
12 time::{Duration, Instant},
13};
14
15use compio_log::{debug, instrument};
16use futures_util::{FutureExt, select};
17
18use crate::Runtime;
19
20/// Waits until `duration` has elapsed.
21///
22/// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An
23/// asynchronous analog to [`std::thread::sleep`].
24///
25/// To run something regularly on a schedule, see [`interval`].
26///
27/// # Examples
28///
29/// Wait 100ms and print "100 ms have elapsed".
30///
31/// ```
32/// use std::time::Duration;
33///
34/// use compio_runtime::time::sleep;
35///
36/// # compio_runtime::Runtime::new().unwrap().block_on(async {
37/// sleep(Duration::from_millis(100)).await;
38/// println!("100 ms have elapsed");
39/// # })
40/// ```
41pub async fn sleep(duration: Duration) {
42 sleep_until(Instant::now() + duration).await
43}
44
45/// Waits until `deadline` is reached.
46///
47/// To run something regularly on a schedule, see [`interval`].
48///
49/// # Examples
50///
51/// Wait 100ms and print "100 ms have elapsed".
52///
53/// ```
54/// use std::time::{Duration, Instant};
55///
56/// use compio_runtime::time::sleep_until;
57///
58/// # compio_runtime::Runtime::new().unwrap().block_on(async {
59/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
60/// println!("100 ms have elapsed");
61/// # })
62/// ```
63pub async fn sleep_until(deadline: Instant) {
64 create_timer(deadline).await
65}
66
67async fn create_timer(instant: std::time::Instant) {
68 let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
69 if let Some(key) = key {
70 TimerFuture::new(key).await;
71 }
72}
73
74/// Error returned by [`timeout`] or [`timeout_at`].
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct Elapsed(());
77
78impl Display for Elapsed {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.write_str("deadline has elapsed")
81 }
82}
83
84impl Error for Elapsed {}
85
86/// Require a [`Future`] to complete before the specified duration has elapsed.
87///
88/// If the future completes before the duration has elapsed, then the completed
89/// value is returned. Otherwise, an error is returned and the future is
90/// cancelled.
91pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, Elapsed> {
92 select! {
93 res = future.fuse() => Ok(res),
94 _ = sleep(duration).fuse() => Err(Elapsed(())),
95 }
96}
97
98/// Require a [`Future`] to complete before the specified instant in time.
99///
100/// If the future completes before the instant is reached, then the completed
101/// value is returned. Otherwise, an error is returned.
102pub async fn timeout_at<F: Future>(deadline: Instant, future: F) -> Result<F::Output, Elapsed> {
103 timeout(deadline - Instant::now(), future).await
104}
105
106/// Interval returned by [`interval`] and [`interval_at`]
107///
108/// This type allows you to wait on a sequence of instants with a certain
109/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
110/// you count the time spent between the calls to [`sleep`] as well.
111#[derive(Debug)]
112pub struct Interval {
113 first_ticked: bool,
114 start: Instant,
115 period: Duration,
116}
117
118impl Interval {
119 pub(crate) fn new(start: Instant, period: Duration) -> Self {
120 Self {
121 first_ticked: false,
122 start,
123 period,
124 }
125 }
126
127 /// Completes when the next instant in the interval has been reached.
128 ///
129 /// See [`interval`] and [`interval_at`].
130 pub async fn tick(&mut self) -> Instant {
131 if !self.first_ticked {
132 sleep_until(self.start).await;
133 self.first_ticked = true;
134 self.start
135 } else {
136 let now = Instant::now();
137 let next = now + self.period
138 - Duration::from_nanos(
139 ((now - self.start).as_nanos() % self.period.as_nanos()) as _,
140 );
141 sleep_until(next).await;
142 next
143 }
144 }
145}
146
147/// Creates new [`Interval`] that yields with interval of `period`. The first
148/// tick completes immediately.
149///
150/// An interval will tick indefinitely. At any time, the [`Interval`] value can
151/// be dropped. This cancels the interval.
152///
153/// This function is equivalent to
154/// [`interval_at(Instant::now(), period)`](interval_at).
155///
156/// # Panics
157///
158/// This function panics if `period` is zero.
159///
160/// # Examples
161///
162/// ```
163/// use std::time::Duration;
164///
165/// use compio_runtime::time::interval;
166///
167/// # compio_runtime::Runtime::new().unwrap().block_on(async {
168/// let mut interval = interval(Duration::from_millis(10));
169///
170/// interval.tick().await; // ticks immediately
171/// interval.tick().await; // ticks after 10ms
172/// interval.tick().await; // ticks after 10ms
173///
174/// // approximately 20ms have elapsed.
175/// # })
176/// ```
177///
178/// A simple example using [`interval`] to execute a task every two seconds.
179///
180/// The difference between [`interval`] and [`sleep`] is that an [`Interval`]
181/// measures the time since the last tick, which means that [`.tick().await`]
182/// may wait for a shorter time than the duration specified for the interval
183/// if some time has passed between calls to [`.tick().await`].
184///
185/// If the tick in the example below was replaced with [`sleep`], the task
186/// would only be executed once every three seconds, and not every two
187/// seconds.
188///
189/// ```no_run
190/// use std::time::Duration;
191///
192/// use compio_runtime::time::{interval, sleep};
193///
194/// async fn task_that_takes_a_second() {
195/// println!("hello");
196/// sleep(Duration::from_secs(1)).await
197/// }
198///
199/// # compio_runtime::Runtime::new().unwrap().block_on(async {
200/// let mut interval = interval(Duration::from_secs(2));
201/// for _i in 0..5 {
202/// interval.tick().await;
203/// task_that_takes_a_second().await;
204/// }
205/// # })
206/// ```
207///
208/// [`sleep`]: crate::time::sleep()
209/// [`.tick().await`]: Interval::tick
210pub fn interval(period: Duration) -> Interval {
211 interval_at(Instant::now(), period)
212}
213
214/// Creates new [`Interval`] that yields with interval of `period` with the
215/// first tick completing at `start`.
216///
217/// An interval will tick indefinitely. At any time, the [`Interval`] value can
218/// be dropped. This cancels the interval.
219///
220/// # Panics
221///
222/// This function panics if `period` is zero.
223///
224/// # Examples
225///
226/// ```
227/// use std::time::{Duration, Instant};
228///
229/// use compio_runtime::time::interval_at;
230///
231/// # compio_runtime::Runtime::new().unwrap().block_on(async {
232/// let start = Instant::now() + Duration::from_millis(50);
233/// let mut interval = interval_at(start, Duration::from_millis(10));
234///
235/// interval.tick().await; // ticks after 50ms
236/// interval.tick().await; // ticks after 10ms
237/// interval.tick().await; // ticks after 10ms
238///
239/// // approximately 70ms have elapsed.
240/// # });
241/// ```
242pub fn interval_at(start: Instant, period: Duration) -> Interval {
243 assert!(period > Duration::ZERO, "`period` must be non-zero.");
244 Interval::new(start, period)
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
248pub(crate) struct TimerKey {
249 deadline: Instant,
250 key: u64,
251 _local_marker: PhantomData<*const ()>,
252}
253
254pub(crate) struct TimerRuntime {
255 key: u64,
256 wheel: BTreeMap<TimerKey, Waker>,
257}
258
259impl TimerRuntime {
260 pub fn new() -> Self {
261 Self {
262 key: 0,
263 wheel: BTreeMap::default(),
264 }
265 }
266
267 /// Return true if the timer has completed.
268 pub fn is_completed(&self, key: &TimerKey) -> bool {
269 !self.wheel.contains_key(key)
270 }
271
272 /// Insert a new timer. If the deadline is in the past, return `None`.
273 pub fn insert(&mut self, deadline: Instant) -> Option<TimerKey> {
274 if deadline <= Instant::now() {
275 return None;
276 }
277 let key = TimerKey {
278 deadline,
279 key: self.key,
280 _local_marker: PhantomData,
281 };
282 self.wheel.insert(key, Waker::noop().clone());
283
284 self.key += 1;
285
286 Some(key)
287 }
288
289 /// Update the waker for a timer.
290 pub fn update_waker(&mut self, key: &TimerKey, waker: &Waker) {
291 if let Some(w) = self.wheel.get_mut(key)
292 && !waker.will_wake(w)
293 {
294 *w = waker.clone();
295 }
296 }
297
298 /// Cancel a timer.
299 pub fn cancel(&mut self, key: &TimerKey) {
300 self.wheel.remove(key);
301 }
302
303 /// Get the minimum timeout duration for the next poll.
304 pub fn min_timeout(&self) -> Option<Duration> {
305 self.wheel.first_key_value().map(|(key, _)| {
306 let now = Instant::now();
307 key.deadline.saturating_duration_since(now)
308 })
309 }
310
311 /// Wake all the timer futures that have reached their deadline.
312 pub fn wake(&mut self) {
313 if self.wheel.is_empty() {
314 return;
315 }
316
317 let now = Instant::now();
318
319 let pending = self.wheel.split_off(&TimerKey {
320 deadline: now,
321 key: u64::MAX,
322 _local_marker: PhantomData,
323 });
324
325 let expired = replace(&mut self.wheel, pending);
326 for (_, w) in expired {
327 w.wake();
328 }
329 }
330
331 pub fn poll_timer(&mut self, cx: &mut Context<'_>, key: &TimerKey) -> Poll<()> {
332 instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
333 if self.is_completed(key) {
334 debug!("ready");
335 Poll::Ready(())
336 } else {
337 debug!("pending");
338 self.update_waker(key, cx.waker());
339 Poll::Pending
340 }
341 }
342}
343
344pub(crate) struct TimerFuture(TimerKey);
345
346impl TimerFuture {
347 pub fn new(key: TimerKey) -> Self {
348 Self(key)
349 }
350}
351
352impl Future for TimerFuture {
353 type Output = ();
354
355 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
356 Runtime::with_current(|r| r.timer_runtime.borrow_mut().poll_timer(cx, &self.0))
357 }
358}
359
360impl Drop for TimerFuture {
361 fn drop(&mut self) {
362 Runtime::with_current(|r| r.timer_runtime.borrow_mut().cancel(&self.0));
363 }
364}
365
366compio_driver::assert_not_impl!(TimerFuture, Send);
367compio_driver::assert_not_impl!(TimerFuture, Sync);
368
369#[test]
370fn timer_min_timeout() {
371 let mut runtime = TimerRuntime::new();
372 assert_eq!(runtime.min_timeout(), None);
373
374 let now = Instant::now();
375 runtime.insert(now + Duration::from_secs(1));
376 runtime.insert(now + Duration::from_secs(10));
377 let min_timeout = runtime.min_timeout().unwrap().as_secs_f32();
378
379 assert!(min_timeout < 1.);
380}