tokio_hrtime/
lib.rs

1//! Hires timers for tokio.
2//!
3//! This is a drop-in replacement of [`tokio::time`](https://docs.rs/tokio/1.40.0/tokio/time/index.html).
4//! The API is a 1:1 replication. Please refer to the doc there.
5//!
6//! Timers with the highest possible resolution from the operating system
7//! are used. The feature `time` of tokio is not used and is not required.
8//! Sub-millisecond granularity is achieved with:
9//! - `timerfd` in Linux (and Android).
10//! - `kqueue` with `EVFILT_TIMER` in *BSD and Apple's Darwin;
11//!   specifically, `NOTE_MACHTIME` is used in Darwin to obtain the similar resolution to GCD.
12//! - `CreateWaitableTimerEx` with `CREATE_WAITABLE_TIMER_HIGH_RESOLUTION` in Windows.
13
14#![warn(clippy::pedantic)]
15#![allow(clippy::must_use_candidate)]
16#![allow(clippy::missing_panics_doc)]
17
18#[doc(no_inline)]
19pub use std::time::{Duration, Instant};
20
21use std::future::{Future, IntoFuture};
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use pin_project_lite::pin_project;
26
27mod utils;
28
29cfg_if::cfg_if! {
30    if #[cfg(any(target_os = "linux", target_os = "android"))] {
31        mod timerfd;
32        use timerfd::Timer;
33    } else if #[cfg(any(target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", target_os = "dragonfly", target_vendor = "apple"))] {
34        mod kqueue;
35        use kqueue::Timer;
36    } else if #[cfg(windows)] {
37        mod waitable;
38        use waitable::Timer;
39    } else {
40        compile_error!("unsupported platform");
41    }
42}
43
44#[cfg(not(feature = "rt"))]
45fn poll_timer(timer: &mut Timer, cx: &mut Context) -> Poll<u64> {
46    timer.poll_expired(cx)
47}
48
49#[cfg(feature = "rt")]
50fn poll_timer(timer: &mut Timer, cx: &mut Context) -> Poll<u64> {
51    std::pin::pin!(tokio::task::unconstrained(std::future::poll_fn(|cx| {
52        timer.poll_expired(cx)
53    })))
54    .poll(cx)
55}
56
57#[must_use]
58pub struct Sleep {
59    timer: Timer,
60    deadline: Instant,
61    elapsed: bool,
62}
63
64pub fn sleep_until(deadline: Instant) -> Sleep {
65    Sleep {
66        timer: Timer::new(Some(deadline), None),
67        deadline,
68        elapsed: false,
69    }
70}
71
72pub fn sleep(duration: Duration) -> Sleep {
73    sleep_until(Instant::now() + duration)
74}
75
76impl Future for Sleep {
77    type Output = ();
78
79    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        if self.elapsed {
81            return Poll::Ready(());
82        }
83
84        self.elapsed = matches!(poll_timer(&mut self.timer, cx), Poll::Ready(_));
85
86        if self.elapsed {
87            Poll::Ready(())
88        } else {
89            Poll::Pending
90        }
91    }
92}
93
94impl Sleep {
95    pub fn deadline(&self) -> Instant {
96        self.deadline
97    }
98
99    pub fn is_elapsed(&self) -> bool {
100        self.elapsed
101    }
102
103    pub fn reset(&mut self, deadline: Instant) {
104        self.timer.reset(Some(deadline), None);
105    }
106}
107
108pub mod error {
109    use std::error::Error;
110    use std::fmt::{Display, Formatter};
111
112    #[derive(Debug, PartialEq, Eq)]
113    pub struct Elapsed(pub(crate) ());
114
115    impl Display for Elapsed {
116        fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
117            "deadline has elapsed".fmt(fmt)
118        }
119    }
120    impl Error for Elapsed {}
121}
122
123pin_project! {
124    #[must_use]
125    pub struct Timeout<F> {
126        #[pin]
127        future: F,
128        sleep: Sleep,
129    }
130}
131
132pub fn timeout_at<F: IntoFuture>(deadline: Instant, future: F) -> Timeout<F::IntoFuture> {
133    Timeout {
134        future: future.into_future(),
135        sleep: sleep_until(deadline),
136    }
137}
138
139pub fn timeout<F: IntoFuture>(duration: Duration, future: F) -> Timeout<F::IntoFuture> {
140    timeout_at(Instant::now() + duration, future)
141}
142
143impl<F: Future> Future for Timeout<F> {
144    type Output = Result<F::Output, error::Elapsed>;
145
146    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147        let mut this = self.project();
148        #[expect(clippy::same_functions_in_if_condition)]
149        if let Poll::Ready(()) = Pin::new(&mut this.sleep).poll(cx) {
150            Poll::Ready(Err(error::Elapsed(())))
151        } else if let Poll::Ready(output) = this.future.poll(cx) {
152            Poll::Ready(Ok(output))
153        } else if let Poll::Ready(()) = Pin::new(&mut this.sleep).poll(cx) {
154            Poll::Ready(Err(error::Elapsed(())))
155        } else {
156            Poll::Pending
157        }
158    }
159}
160
161impl<F> Timeout<F> {
162    pub fn get_ref(&self) -> &F {
163        &self.future
164    }
165
166    pub fn get_mut(&mut self) -> &mut F {
167        &mut self.future
168    }
169
170    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut F> {
171        self.project().future
172    }
173
174    pub fn into_inner(self) -> F {
175        self.future
176    }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum MissedTickBehavior {
181    Burst,
182    Delay,
183    Skip,
184}
185
186#[must_use]
187pub struct Interval {
188    timer: Timer,
189    period: Duration,
190    expirations: u64,
191    behavior: MissedTickBehavior,
192}
193
194pub fn interval_at(start: Instant, period: Duration) -> Interval {
195    Interval {
196        timer: Timer::new(Some(start), Some(period)),
197        period,
198        expirations: 0,
199        behavior: MissedTickBehavior::Burst,
200    }
201}
202
203pub fn interval(period: Duration) -> Interval {
204    Interval {
205        timer: Timer::new(None, Some(period)),
206        period,
207        expirations: 1,
208        behavior: MissedTickBehavior::Burst,
209    }
210}
211
212struct Tick<'a>(&'a mut Interval);
213
214impl<'a> Future for Tick<'a> {
215    type Output = Instant;
216
217    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218        self.0.poll_tick(cx)
219    }
220}
221
222impl Interval {
223    pub fn tick(&mut self) -> impl Future<Output = Instant> + '_ {
224        Tick(self)
225    }
226
227    pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
228        if let Poll::Ready(exp) = poll_timer(&mut self.timer, cx) {
229            self.expirations += exp;
230        };
231        if self.expirations != 0 {
232            self.expirations = match self.behavior {
233                MissedTickBehavior::Burst => self.expirations - 1,
234                MissedTickBehavior::Skip => 0,
235                MissedTickBehavior::Delay => unreachable!(),
236            };
237            Poll::Ready(Instant::now())
238        } else {
239            Poll::Pending
240        }
241    }
242
243    pub fn reset(&mut self) {
244        self.timer.reset(None, Some(self.period));
245    }
246
247    pub fn reset_immediately(&mut self) {
248        self.expirations = 1;
249        self.reset();
250    }
251
252    pub fn reset_after(&mut self, after: Duration) {
253        self.reset_at(Instant::now() + after);
254    }
255
256    pub fn reset_at(&mut self, deadline: Instant) {
257        let now = Instant::now();
258        if now < deadline {
259            self.expirations = 0;
260            self.timer.reset(Some(deadline), Some(self.period));
261        } else {
262            let past = u64::try_from((now - deadline).as_nanos()).unwrap();
263            let divider = u64::try_from(self.period.as_nanos()).unwrap();
264            self.expirations = past / divider + 1;
265            #[expect(clippy::unchecked_duration_subtraction)]
266            self.timer.reset(
267                Some(now - Duration::from_nanos(past % divider) + self.period),
268                Some(self.period),
269            );
270        }
271    }
272
273    pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
274        self.behavior
275    }
276
277    pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
278        if behavior == MissedTickBehavior::Delay {
279            unimplemented!("MissedTickBehavior::Delay is not implemented yet");
280        }
281        self.behavior = behavior;
282    }
283
284    pub fn period(&self) -> Duration {
285        self.period
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use crate::*;
292    use std::sync::LazyLock;
293
294    cfg_if::cfg_if! {
295        if #[cfg(feature = "test-hires")] {
296            const TOLERANCE: Duration = Duration::from_millis(1);
297        } else {
298            const TOLERANCE: Duration = Duration::from_millis(5);
299        }
300    }
301
302    fn new_current_thread_runtime() -> tokio::runtime::Runtime {
303        let mut builder = tokio::runtime::Builder::new_current_thread();
304        #[cfg(unix)]
305        builder.enable_io();
306        builder.build().unwrap()
307    }
308
309    fn new_multi_thread_runtime() -> tokio::runtime::Runtime {
310        let mut builder = tokio::runtime::Builder::new_multi_thread();
311        builder.worker_threads(4);
312        #[cfg(unix)]
313        builder.enable_io();
314        builder.build().unwrap()
315    }
316
317    static CURRENT_THREAD_RUNTIME: LazyLock<tokio::runtime::Runtime> =
318        LazyLock::new(new_current_thread_runtime);
319
320    static MULTI_THREAD_RUNTIME: LazyLock<tokio::runtime::Runtime> =
321        LazyLock::new(new_multi_thread_runtime);
322
323    macro_rules! mytest {
324        ($(async fn $name:ident() $body:block)*) => {
325            $(
326                #[test]
327                fn $name() {
328                    let current_thread_runtime = &*CURRENT_THREAD_RUNTIME;
329                    let multi_thread_runtime = &*MULTI_THREAD_RUNTIME;
330                    let local_runtime = new_current_thread_runtime();
331                    let f1 = current_thread_runtime.spawn(async $body);
332                    let f2 = multi_thread_runtime.spawn(async $body);
333                    let f3 = local_runtime.spawn(async $body);
334                    current_thread_runtime.block_on(f1).unwrap();
335                    multi_thread_runtime.block_on(f2).unwrap();
336                    local_runtime.block_on(f3).unwrap();
337                }
338            )*
339        }
340    }
341
342    mytest! {
343        async fn test_sleep() {
344            let start = Instant::now();
345            let duration = Duration::from_millis(100);
346            sleep(duration).await;
347            let elapsed = start.elapsed();
348            assert!(elapsed.abs_diff(duration) < TOLERANCE);
349        }
350
351        async fn test_sleep_until() {
352            let start = Instant::now();
353            let duration = Duration::from_millis(100);
354            sleep_until(start + duration).await;
355            let elapsed = start.elapsed();
356            assert!(elapsed.abs_diff(duration) < TOLERANCE);
357        }
358
359        async fn test_timeout() {
360            let start = Instant::now();
361            let duration = Duration::from_millis(100);
362            let large_duration = Duration::from_secs(1);
363            let small_duration = Duration::from_millis(10);
364            assert!(timeout(duration, sleep(small_duration)).await.is_ok());
365            let elapsed = start.elapsed();
366            assert!(elapsed.abs_diff(small_duration) < TOLERANCE);
367
368            let start = Instant::now();
369            assert!(timeout(duration, sleep(large_duration)).await.is_err());
370            let elapsed = start.elapsed();
371            assert!(elapsed.abs_diff(duration) < TOLERANCE);
372        }
373
374        async fn test_timeout_at() {
375            let start = Instant::now();
376            let duration = Duration::from_millis(100);
377            let large_duration = Duration::from_secs(1);
378            let small_duration = Duration::from_millis(10);
379            assert!(timeout_at(start + duration, sleep(small_duration))
380                .await
381                .is_ok());
382            let elapsed = start.elapsed();
383            assert!(elapsed.abs_diff(small_duration) < TOLERANCE);
384
385            let start = Instant::now();
386            assert!(timeout_at(start + duration, sleep(large_duration))
387                .await
388                .is_err());
389            let elapsed = start.elapsed();
390            assert!(elapsed.abs_diff(duration) < TOLERANCE);
391        }
392
393        async fn test_interval() {
394            let start = Instant::now();
395            let duration = Duration::from_millis(100);
396            let mut iv = interval(duration);
397
398            for i in 0..10 {
399                let _ = iv.tick().await;
400                let elapsed = start.elapsed();
401                assert!(elapsed.abs_diff(duration * i) < TOLERANCE);
402            }
403        }
404
405        async fn test_interval_at() {
406            let start = Instant::now();
407            let duration = Duration::from_millis(100);
408            let mut iv = interval_at(start + duration, duration);
409
410            for i in 1..=10 {
411                let _ = iv.tick().await;
412                let elapsed = start.elapsed();
413                assert!(elapsed.abs_diff(duration * i) < TOLERANCE);
414            }
415        }
416
417        async fn test_interval_burst() {
418            let start = Instant::now();
419            let duration = Duration::from_millis(100);
420            let mut iv = interval(duration);
421
422            sleep(duration * 3).await;
423            let _ = iv.tick().await;
424            let _ = iv.tick().await;
425            let _ = iv.tick().await;
426            let _ = iv.tick().await;
427            let _ = iv.tick().await;
428            let elapsed = start.elapsed();
429            assert!(elapsed.abs_diff(duration * 4) < TOLERANCE);
430        }
431
432        async fn test_interval_skip() {
433            let start = Instant::now();
434            let duration = Duration::from_millis(100);
435            let mut iv = interval(duration);
436            iv.set_missed_tick_behavior(MissedTickBehavior::Skip);
437
438            sleep(Duration::from_millis(350)).await;
439            let _ = iv.tick().await;
440            let _ = iv.tick().await;
441            let elapsed = start.elapsed();
442            assert!(
443                elapsed.abs_diff(duration * 4) < TOLERANCE
444                    || elapsed.abs_diff(duration * 5) < TOLERANCE
445            );
446        }
447
448        async fn test_interval_reset() {
449            let start = Instant::now();
450            let duration = Duration::from_millis(100);
451            let mut iv = interval(duration);
452
453            let _ = iv.tick().await;
454            let _ = iv.tick().await;
455            let _ = iv.tick().await;
456            let elapsed = start.elapsed();
457            assert!(elapsed.abs_diff(duration * 2) < TOLERANCE);
458
459            iv.reset_immediately();
460            let _ = iv.tick().await;
461            let _ = iv.tick().await;
462            let _ = iv.tick().await;
463            let elapsed = start.elapsed();
464            assert!(elapsed.abs_diff(duration * 4) < TOLERANCE);
465
466            iv.reset_at(start + Duration::from_millis(250));
467            let _ = iv.tick().await;
468            let _ = iv.tick().await;
469            let _ = iv.tick().await;
470            let elapsed = start.elapsed();
471            assert!(elapsed.abs_diff(Duration::from_millis(450)) < TOLERANCE);
472        }
473    }
474}