Skip to main content

ntex_util/time/
mod.rs

1//! Utilities for tracking time.
2#![allow(
3    clippy::cast_possible_truncation,
4    clippy::cast_sign_loss,
5    clippy::cast_possible_wrap
6)]
7use std::{cmp, future::Future, future::poll_fn, pin::Pin, task, task::Poll};
8
9mod types;
10mod wheel;
11
12pub use self::types::{Millis, Seconds};
13pub use self::wheel::{TimerHandle, now, query_system_time, system_time};
14
15/// Waits until `duration` has elapsed.
16///
17/// No work is performed while awaiting on the sleep future to complete. `Sleep`
18/// operates at 16 millisecond granularity and should not be used for tasks that
19/// require high-resolution timers. `Sleep` sleeps at least one tick (16 millis)
20/// even if 0 millis duration is used.
21#[inline]
22pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
23    Sleep::new(dur.into())
24}
25
26/// Waits until `duration` has elapsed.
27///
28/// This is similar to `sleep` future, but in case of `0` duration deadline future
29/// never completes.
30#[inline]
31pub fn deadline<T: Into<Millis>>(dur: T) -> Deadline {
32    Deadline::new(dur.into())
33}
34
35/// Creates new [`Interval`] that yields with interval of `period`.
36///
37/// An interval will tick indefinitely. At any time, the [`Interval`] value can
38/// be dropped. This cancels the interval.
39#[inline]
40pub fn interval<T: Into<Millis>>(period: T) -> Interval {
41    Interval::new(period.into())
42}
43
44/// Require a `Future` to complete before the specified duration has elapsed.
45///
46/// If the future completes before the duration has elapsed, then the completed
47/// value is returned. Otherwise, an error is returned and the future is
48/// canceled.
49#[inline]
50pub fn timeout<T, U>(dur: U, future: T) -> Timeout<T>
51where
52    T: Future,
53    U: Into<Millis>,
54{
55    Timeout::new_with_delay(future, Sleep::new(dur.into()))
56}
57
58/// Require a `Future` to complete before the specified duration has elapsed.
59///
60/// If the future completes before the duration has elapsed, then the completed
61/// value is returned. Otherwise, an error is returned and the future is
62/// canceled. If duration value is zero then timeout is disabled.
63#[inline]
64pub fn timeout_checked<T, U>(dur: U, future: T) -> TimeoutChecked<T>
65where
66    T: Future,
67    U: Into<Millis>,
68{
69    TimeoutChecked::new_with_delay(future, dur.into())
70}
71
72/// Future returned by [`sleep`].
73///
74/// # Examples
75///
76/// Wait 100ms and print "100 ms have elapsed".
77///
78/// ```
79/// use ntex::time::{sleep, Millis};
80///
81/// #[ntex::main]
82/// async fn main() {
83///     sleep(Millis(100)).await;
84///     println!("100 ms have elapsed");
85/// }
86/// ```
87#[derive(Debug)]
88#[must_use = "futures do nothing unless you `.await` or poll them"]
89pub struct Sleep {
90    // The link between the `Sleep` instance and the timer that drives it.
91    hnd: TimerHandle,
92}
93
94impl Sleep {
95    /// Create new sleep future
96    #[inline]
97    pub fn new(duration: Millis) -> Sleep {
98        Sleep {
99            hnd: TimerHandle::new(u64::from(cmp::max(duration.0, 1))),
100        }
101    }
102
103    /// Returns `true` if `Sleep` has elapsed.
104    #[inline]
105    pub fn is_elapsed(&self) -> bool {
106        self.hnd.is_elapsed()
107    }
108
109    /// Complete sleep timer.
110    #[inline]
111    pub fn elapse(&self) {
112        self.hnd.elapse();
113    }
114
115    /// Resets the `Sleep` instance to a new deadline.
116    ///
117    /// Calling this function allows changing the instant at which the `Sleep`
118    /// future completes without having to create new associated state.
119    ///
120    /// This function can be called both before and after the future has
121    /// completed.
122    pub fn reset<T: Into<Millis>>(&self, millis: T) {
123        self.hnd.reset(u64::from(millis.into().0));
124    }
125
126    #[inline]
127    /// Wait when `Sleep` instance get elapsed.
128    pub async fn wait(&self) {
129        poll_fn(|cx| self.hnd.poll_elapsed(cx)).await;
130    }
131
132    #[inline]
133    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
134        self.hnd.poll_elapsed(cx)
135    }
136}
137
138impl Future for Sleep {
139    type Output = ();
140
141    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
142        self.hnd.poll_elapsed(cx)
143    }
144}
145
146/// Future returned by [`deadline`].
147///
148/// # Examples
149///
150/// Wait 100ms and print "100 ms have elapsed".
151///
152/// ```
153/// use ntex::time::{deadline, Millis};
154///
155/// #[ntex::main]
156/// async fn main() {
157///     deadline(Millis(100)).await;
158///     println!("100 ms have elapsed");
159/// }
160/// ```
161#[derive(Debug)]
162#[must_use = "futures do nothing unless you `.await` or poll them"]
163pub struct Deadline {
164    hnd: Option<TimerHandle>,
165}
166
167impl Deadline {
168    /// Create new deadline future
169    #[inline]
170    pub fn new(duration: Millis) -> Deadline {
171        if duration.0 != 0 {
172            Deadline {
173                hnd: Some(TimerHandle::new(u64::from(duration.0))),
174            }
175        } else {
176            Deadline { hnd: None }
177        }
178    }
179
180    #[inline]
181    /// Wait when `Sleep` instance get elapsed.
182    pub async fn wait(&self) {
183        poll_fn(|cx| self.poll_elapsed(cx)).await;
184    }
185
186    /// Resets the `Deadline` instance to a new deadline.
187    ///
188    /// Calling this function allows changing the instant at which the `Deadline`
189    /// future completes without having to create new associated state.
190    ///
191    /// This function can be called both before and after the future has
192    /// completed.
193    pub fn reset<T: Into<Millis>>(&mut self, millis: T) {
194        let millis = millis.into();
195        if millis.0 != 0 {
196            if let Some(ref mut hnd) = self.hnd {
197                hnd.reset(u64::from(millis.0));
198            } else {
199                self.hnd = Some(TimerHandle::new(u64::from(millis.0)));
200            }
201        } else {
202            let _ = self.hnd.take();
203        }
204    }
205
206    /// Returns `true` if `Deadline` has elapsed.
207    #[inline]
208    pub fn is_elapsed(&self) -> bool {
209        self.hnd.as_ref().is_none_or(TimerHandle::is_elapsed)
210    }
211
212    #[inline]
213    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
214        self.hnd
215            .as_ref()
216            .map_or(Poll::Pending, |t| t.poll_elapsed(cx))
217    }
218}
219
220impl Future for Deadline {
221    type Output = ();
222
223    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
224        self.poll_elapsed(cx)
225    }
226}
227
228pin_project_lite::pin_project! {
229    /// Future returned by [`timeout`](timeout).
230    #[must_use = "futures do nothing unless you `.await` or poll them"]
231    #[derive(Debug)]
232    pub struct Timeout<T> {
233        #[pin]
234        value: T,
235        delay: Sleep,
236    }
237}
238
239impl<T> Timeout<T> {
240    pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
241        Timeout { value, delay }
242    }
243}
244
245impl<T> Future for Timeout<T>
246where
247    T: Future,
248{
249    type Output = Result<T::Output, ()>;
250
251    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
252        let this = self.project();
253
254        // First, try polling the future
255        if let Poll::Ready(v) = this.value.poll(cx) {
256            return Poll::Ready(Ok(v));
257        }
258
259        // Now check the timer
260        match this.delay.poll_elapsed(cx) {
261            Poll::Ready(()) => Poll::Ready(Err(())),
262            Poll::Pending => Poll::Pending,
263        }
264    }
265}
266
267pin_project_lite::pin_project! {
268    /// Future returned by [`timeout_checked`](timeout_checked).
269    #[must_use = "futures do nothing unless you `.await` or poll them"]
270    pub struct TimeoutChecked<T> {
271        #[pin]
272        state: TimeoutCheckedState<T>,
273    }
274}
275
276pin_project_lite::pin_project! {
277    #[project = TimeoutCheckedStateProject]
278    enum TimeoutCheckedState<T> {
279        Timeout{ #[pin] fut: Timeout<T> },
280        NoTimeout{ #[pin] fut: T },
281    }
282}
283
284impl<T> TimeoutChecked<T> {
285    pub(crate) fn new_with_delay(value: T, delay: Millis) -> TimeoutChecked<T> {
286        if delay.is_zero() {
287            TimeoutChecked {
288                state: TimeoutCheckedState::NoTimeout { fut: value },
289            }
290        } else {
291            TimeoutChecked {
292                state: TimeoutCheckedState::Timeout {
293                    fut: Timeout::new_with_delay(value, sleep(delay)),
294                },
295            }
296        }
297    }
298}
299
300impl<T> Future for TimeoutChecked<T>
301where
302    T: Future,
303{
304    type Output = Result<T::Output, ()>;
305
306    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
307        match self.project().state.as_mut().project() {
308            TimeoutCheckedStateProject::Timeout { fut } => fut.poll(cx),
309            TimeoutCheckedStateProject::NoTimeout { fut } => fut.poll(cx).map(Result::Ok),
310        }
311    }
312}
313
314/// Interval returned by [`interval`]
315///
316/// This type allows you to wait on a sequence of instants with a certain
317/// duration between each instant.
318#[must_use = "futures do nothing unless you `.await` or poll them"]
319#[derive(Debug)]
320pub struct Interval {
321    hnd: TimerHandle,
322    period: u32,
323}
324
325impl Interval {
326    /// Create new sleep future
327    #[inline]
328    pub fn new(period: Millis) -> Interval {
329        Interval {
330            hnd: TimerHandle::new(u64::from(period.0)),
331            period: period.0,
332        }
333    }
334
335    #[inline]
336    pub async fn tick(&self) {
337        poll_fn(|cx| self.poll_tick(cx)).await;
338    }
339
340    #[inline]
341    pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
342        if self.hnd.poll_elapsed(cx).is_ready() {
343            self.hnd.reset(u64::from(self.period));
344            Poll::Ready(())
345        } else {
346            Poll::Pending
347        }
348    }
349}
350
351impl crate::Stream for Interval {
352    type Item = ();
353
354    #[inline]
355    fn poll_next(
356        self: Pin<&mut Self>,
357        cx: &mut task::Context<'_>,
358    ) -> Poll<Option<Self::Item>> {
359        self.poll_tick(cx).map(|()| Some(()))
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use futures_util::StreamExt;
366    use std::{future::poll_fn, rc::Rc, time};
367
368    use super::*;
369    use crate::future::lazy;
370
371    /// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval.
372    ///
373    /// Expected Behavior: Two back-to-back calls of `now()` return the same value.
374    #[ntex::test]
375    async fn lowres_time_does_not_immediately_change() {
376        sleep(Millis(25)).await;
377
378        assert_eq!(now(), now());
379    }
380
381    /// State Under Test: `now()` updates returned value every ~1ms period.
382    ///
383    /// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values
384    /// and second value is greater than the first one at least by a 1ms interval.
385    #[ntex::test]
386    async fn lowres_time_updates_after_resolution_interval() {
387        sleep(Millis(50)).await;
388
389        let first_time = now();
390
391        sleep(Millis(25)).await;
392
393        let second_time = now();
394        assert!(second_time - first_time >= time::Duration::from_millis(25));
395    }
396
397    /// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval.
398    ///
399    /// Expected Behavior: Two back-to-back calls of `now()` return the same value.
400    #[ntex::test]
401    async fn system_time_service_time_does_not_immediately_change() {
402        sleep(Seconds(1)).await;
403
404        assert_eq!(system_time(), system_time());
405        assert_eq!(system_time(), query_system_time());
406    }
407
408    /// State Under Test: `system_time()` updates returned value every 1ms period.
409    ///
410    /// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values
411    /// and second value is greater than the first one at least by a resolution interval.
412    #[ntex::test]
413    async fn system_time_service_time_updates_after_resolution_interval() {
414        sleep(Millis(100)).await;
415
416        let wait_time = 300;
417
418        let first_time = system_time()
419            .duration_since(time::SystemTime::UNIX_EPOCH)
420            .unwrap();
421
422        sleep(Millis(wait_time)).await;
423
424        let second_time = system_time()
425            .duration_since(time::SystemTime::UNIX_EPOCH)
426            .unwrap();
427
428        assert!(
429            second_time.checked_sub(first_time).unwrap()
430                >= time::Duration::from_millis(u64::from(wait_time))
431        );
432    }
433
434    #[ntex::test]
435    async fn test_sleep_0() {
436        sleep(Seconds(1)).await;
437
438        let first_time = now();
439        sleep(Millis(0)).await;
440        let second_time = now();
441        assert!(second_time - first_time >= time::Duration::from_millis(1));
442
443        let first_time = now();
444        sleep(Millis(1)).await;
445        let second_time = now();
446        assert!(second_time - first_time >= time::Duration::from_millis(1));
447
448        let first_time = now();
449        let fut = sleep(Millis(10000));
450        assert!(!fut.is_elapsed());
451        fut.reset(Millis::ZERO);
452        fut.await;
453        let second_time = now();
454        assert!(second_time - first_time < time::Duration::from_millis(1));
455
456        let first_time = now();
457        let fut = Sleep {
458            hnd: TimerHandle::new(0),
459        };
460        assert!(fut.is_elapsed());
461        fut.await;
462        let second_time = now();
463        assert!(second_time - first_time < time::Duration::from_millis(1));
464
465        let first_time = now();
466        let fut = Rc::new(sleep(Millis(10_0000)));
467        let s = fut.clone();
468        ntex::rt::spawn(async move {
469            s.elapse();
470        });
471        poll_fn(|cx| fut.poll_elapsed(cx)).await;
472        assert!(fut.is_elapsed());
473        let second_time = now();
474        assert!(second_time - first_time < time::Duration::from_millis(1));
475    }
476
477    #[ntex::test]
478    async fn test_deadline() {
479        sleep(Seconds(1)).await;
480
481        let first_time = now();
482        let dl = deadline(Millis(1));
483        dl.await;
484        let second_time = now();
485        assert!(second_time - first_time >= time::Duration::from_millis(1));
486        assert!(timeout(Millis(100), deadline(Millis(0))).await.is_err());
487
488        let mut dl = deadline(Millis(1));
489        dl.reset(Millis::ZERO);
490        assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
491
492        let mut dl = deadline(Millis(1));
493        dl.reset(Millis(100));
494        let first_time = now();
495        dl.await;
496        let second_time = now();
497        assert!(second_time - first_time >= time::Duration::from_millis(100));
498
499        let mut dl = deadline(Millis(0));
500        assert!(dl.is_elapsed());
501        dl.reset(Millis(1));
502        assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
503
504        assert!(format!("{dl:?}").contains("Deadline"));
505    }
506
507    #[ntex::test]
508    async fn test_interval() {
509        let mut int = interval(Millis(250));
510
511        let time = time::Instant::now();
512        int.tick().await;
513        let elapsed = time.elapsed();
514        assert!(
515            elapsed > time::Duration::from_millis(200)
516                && elapsed < time::Duration::from_millis(450),
517            "elapsed: {elapsed:?}"
518        );
519
520        let time = time::Instant::now();
521        int.next().await;
522        let elapsed = time.elapsed();
523        assert!(
524            elapsed > time::Duration::from_millis(200)
525                && elapsed < time::Duration::from_millis(450),
526            "elapsed: {elapsed:?}"
527        );
528    }
529
530    #[ntex::test]
531    async fn test_interval_one_sec() {
532        let int = interval(Millis::ONE_SEC);
533
534        for _i in 0..3 {
535            let time = time::Instant::now();
536            int.tick().await;
537            let elapsed = time.elapsed();
538            assert!(
539                elapsed > time::Duration::from_secs(1)
540                    && elapsed < time::Duration::from_millis(1300),
541                "elapsed: {elapsed:?}"
542            );
543        }
544    }
545
546    #[ntex::test]
547    async fn test_timeout_checked() {
548        let result = timeout_checked(Millis(200), sleep(Millis(100))).await;
549        assert!(result.is_ok());
550
551        let result = timeout_checked(Millis(5), sleep(Millis(100))).await;
552        assert!(result.is_err());
553
554        let result = timeout_checked(Millis(0), sleep(Millis(100))).await;
555        assert!(result.is_ok());
556    }
557}