ntex_util/time/
mod.rs

1//! Utilities for tracking time.
2use std::{cmp, future::poll_fn, future::Future, pin::Pin, task, task::Poll};
3
4mod types;
5mod wheel;
6
7pub use self::types::{Millis, Seconds};
8pub use self::wheel::{now, query_system_time, system_time, TimerHandle};
9
10/// Waits until `duration` has elapsed.
11///
12/// No work is performed while awaiting on the sleep future to complete. `Sleep`
13/// operates at 16 millisecond granularity and should not be used for tasks that
14/// require high-resolution timers. `Sleep` sleeps at least one tick (16 millis)
15/// even if 0 millis duration is used.
16#[inline]
17pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
18    Sleep::new(dur.into())
19}
20
21/// Waits until `duration` has elapsed.
22///
23/// This is similar to `sleep` future, but in case of `0` duration deadline future
24/// never completes.
25#[inline]
26pub fn deadline<T: Into<Millis>>(dur: T) -> Deadline {
27    Deadline::new(dur.into())
28}
29
30/// Creates new [`Interval`] that yields with interval of `period`.
31///
32/// An interval will tick indefinitely. At any time, the [`Interval`] value can
33/// be dropped. This cancels the interval.
34#[inline]
35pub fn interval<T: Into<Millis>>(period: T) -> Interval {
36    Interval::new(period.into())
37}
38
39/// Require a `Future` to complete before the specified duration has elapsed.
40///
41/// If the future completes before the duration has elapsed, then the completed
42/// value is returned. Otherwise, an error is returned and the future is
43/// canceled.
44#[inline]
45pub fn timeout<T, U>(dur: U, future: T) -> Timeout<T>
46where
47    T: Future,
48    U: Into<Millis>,
49{
50    Timeout::new_with_delay(future, Sleep::new(dur.into()))
51}
52
53/// Require a `Future` to complete before the specified duration has elapsed.
54///
55/// If the future completes before the duration has elapsed, then the completed
56/// value is returned. Otherwise, an error is returned and the future is
57/// canceled. If duration value is zero then timeout is disabled.
58#[inline]
59pub fn timeout_checked<T, U>(dur: U, future: T) -> TimeoutChecked<T>
60where
61    T: Future,
62    U: Into<Millis>,
63{
64    TimeoutChecked::new_with_delay(future, dur.into())
65}
66
67/// Future returned by [`sleep`].
68///
69/// # Examples
70///
71/// Wait 100ms and print "100 ms have elapsed".
72///
73/// ```
74/// use ntex::time::{sleep, Millis};
75///
76/// #[ntex::main]
77/// async fn main() {
78///     sleep(Millis(100)).await;
79///     println!("100 ms have elapsed");
80/// }
81/// ```
82#[derive(Debug)]
83#[must_use = "futures do nothing unless you `.await` or poll them"]
84pub struct Sleep {
85    // The link between the `Sleep` instance and the timer that drives it.
86    hnd: TimerHandle,
87}
88
89impl Sleep {
90    /// Create new sleep future
91    #[inline]
92    pub fn new(duration: Millis) -> Sleep {
93        Sleep {
94            hnd: TimerHandle::new(cmp::max(duration.0, 1) as u64),
95        }
96    }
97
98    /// Returns `true` if `Sleep` has elapsed.
99    #[inline]
100    pub fn is_elapsed(&self) -> bool {
101        self.hnd.is_elapsed()
102    }
103
104    /// Complete sleep timer.
105    #[inline]
106    pub fn elapse(&self) {
107        self.hnd.elapse()
108    }
109
110    /// Resets the `Sleep` instance to a new deadline.
111    ///
112    /// Calling this function allows changing the instant at which the `Sleep`
113    /// future completes without having to create new associated state.
114    ///
115    /// This function can be called both before and after the future has
116    /// completed.
117    pub fn reset<T: Into<Millis>>(&self, millis: T) {
118        self.hnd.reset(millis.into().0 as u64);
119    }
120
121    #[inline]
122    /// Wait when `Sleep` instance get elapsed.
123    pub async fn wait(&self) {
124        poll_fn(|cx| self.hnd.poll_elapsed(cx)).await
125    }
126
127    #[inline]
128    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
129        self.hnd.poll_elapsed(cx)
130    }
131}
132
133impl Future for Sleep {
134    type Output = ();
135
136    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
137        self.hnd.poll_elapsed(cx)
138    }
139}
140
141/// Future returned by [`deadline`].
142///
143/// # Examples
144///
145/// Wait 100ms and print "100 ms have elapsed".
146///
147/// ```
148/// use ntex::time::{deadline, Millis};
149///
150/// #[ntex::main]
151/// async fn main() {
152///     deadline(Millis(100)).await;
153///     println!("100 ms have elapsed");
154/// }
155/// ```
156#[derive(Debug)]
157#[must_use = "futures do nothing unless you `.await` or poll them"]
158pub struct Deadline {
159    hnd: Option<TimerHandle>,
160}
161
162impl Deadline {
163    /// Create new deadline future
164    #[inline]
165    pub fn new(duration: Millis) -> Deadline {
166        if duration.0 != 0 {
167            Deadline {
168                hnd: Some(TimerHandle::new(duration.0 as u64)),
169            }
170        } else {
171            Deadline { hnd: None }
172        }
173    }
174
175    #[inline]
176    /// Wait when `Sleep` instance get elapsed.
177    pub async fn wait(&self) {
178        poll_fn(|cx| self.poll_elapsed(cx)).await
179    }
180
181    /// Resets the `Deadline` instance to a new deadline.
182    ///
183    /// Calling this function allows changing the instant at which the `Deadline`
184    /// future completes without having to create new associated state.
185    ///
186    /// This function can be called both before and after the future has
187    /// completed.
188    pub fn reset<T: Into<Millis>>(&mut self, millis: T) {
189        let millis = millis.into();
190        if millis.0 != 0 {
191            if let Some(ref mut hnd) = self.hnd {
192                hnd.reset(millis.0 as u64);
193            } else {
194                self.hnd = Some(TimerHandle::new(millis.0 as u64));
195            }
196        } else {
197            let _ = self.hnd.take();
198        }
199    }
200
201    /// Returns `true` if `Deadline` has elapsed.
202    #[inline]
203    pub fn is_elapsed(&self) -> bool {
204        self.hnd.as_ref().map(|t| t.is_elapsed()).unwrap_or(true)
205    }
206
207    #[inline]
208    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
209        self.hnd
210            .as_ref()
211            .map(|t| t.poll_elapsed(cx))
212            .unwrap_or(Poll::Pending)
213    }
214}
215
216impl Future for Deadline {
217    type Output = ();
218
219    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
220        self.poll_elapsed(cx)
221    }
222}
223
224pin_project_lite::pin_project! {
225    /// Future returned by [`timeout`](timeout).
226    #[must_use = "futures do nothing unless you `.await` or poll them"]
227    #[derive(Debug)]
228    pub struct Timeout<T> {
229        #[pin]
230        value: T,
231        delay: Sleep,
232    }
233}
234
235impl<T> Timeout<T> {
236    pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
237        Timeout { value, delay }
238    }
239}
240
241impl<T> Future for Timeout<T>
242where
243    T: Future,
244{
245    type Output = Result<T::Output, ()>;
246
247    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
248        let this = self.project();
249
250        // First, try polling the future
251        if let Poll::Ready(v) = this.value.poll(cx) {
252            return Poll::Ready(Ok(v));
253        }
254
255        // Now check the timer
256        match this.delay.poll_elapsed(cx) {
257            Poll::Ready(()) => Poll::Ready(Err(())),
258            Poll::Pending => Poll::Pending,
259        }
260    }
261}
262
263pin_project_lite::pin_project! {
264    /// Future returned by [`timeout_checked`](timeout_checked).
265    #[must_use = "futures do nothing unless you `.await` or poll them"]
266    pub struct TimeoutChecked<T> {
267        #[pin]
268        state: TimeoutCheckedState<T>,
269    }
270}
271
272pin_project_lite::pin_project! {
273    #[project = TimeoutCheckedStateProject]
274    enum TimeoutCheckedState<T> {
275        Timeout{ #[pin] fut: Timeout<T> },
276        NoTimeout{ #[pin] fut: T },
277    }
278}
279
280impl<T> TimeoutChecked<T> {
281    pub(crate) fn new_with_delay(value: T, delay: Millis) -> TimeoutChecked<T> {
282        if delay.is_zero() {
283            TimeoutChecked {
284                state: TimeoutCheckedState::NoTimeout { fut: value },
285            }
286        } else {
287            TimeoutChecked {
288                state: TimeoutCheckedState::Timeout {
289                    fut: Timeout::new_with_delay(value, sleep(delay)),
290                },
291            }
292        }
293    }
294}
295
296impl<T> Future for TimeoutChecked<T>
297where
298    T: Future,
299{
300    type Output = Result<T::Output, ()>;
301
302    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
303        match self.project().state.as_mut().project() {
304            TimeoutCheckedStateProject::Timeout { fut } => fut.poll(cx),
305            TimeoutCheckedStateProject::NoTimeout { fut } => fut.poll(cx).map(Result::Ok),
306        }
307    }
308}
309
310/// Interval returned by [`interval`]
311///
312/// This type allows you to wait on a sequence of instants with a certain
313/// duration between each instant.
314#[must_use = "futures do nothing unless you `.await` or poll them"]
315#[derive(Debug)]
316pub struct Interval {
317    hnd: TimerHandle,
318    period: u32,
319}
320
321impl Interval {
322    /// Create new sleep future
323    #[inline]
324    pub fn new(period: Millis) -> Interval {
325        Interval {
326            hnd: TimerHandle::new(period.0 as u64),
327            period: period.0,
328        }
329    }
330
331    #[inline]
332    pub async fn tick(&self) {
333        poll_fn(|cx| self.poll_tick(cx)).await;
334    }
335
336    #[inline]
337    pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
338        if self.hnd.poll_elapsed(cx).is_ready() {
339            self.hnd.reset(self.period as u64);
340            Poll::Ready(())
341        } else {
342            Poll::Pending
343        }
344    }
345}
346
347impl crate::Stream for Interval {
348    type Item = ();
349
350    #[inline]
351    fn poll_next(
352        self: Pin<&mut Self>,
353        cx: &mut task::Context<'_>,
354    ) -> Poll<Option<Self::Item>> {
355        self.poll_tick(cx).map(|_| Some(()))
356    }
357}
358
359#[cfg(test)]
360#[allow(clippy::let_underscore_future)]
361mod tests {
362    use futures_util::StreamExt;
363    use std::{future::poll_fn, rc::Rc, time};
364
365    use super::*;
366    use crate::future::lazy;
367
368    /// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval.
369    ///
370    /// Expected Behavior: Two back-to-back calls of `now()` return the same value.
371    #[ntex_macros::rt_test2]
372    async fn lowres_time_does_not_immediately_change() {
373        let _ = sleep(Seconds(1));
374
375        assert_eq!(now(), now())
376    }
377
378    /// State Under Test: `now()` updates returned value every ~1ms period.
379    ///
380    /// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values
381    /// and second value is greater than the first one at least by a 1ms interval.
382    #[ntex_macros::rt_test2]
383    async fn lowres_time_updates_after_resolution_interval() {
384        let _ = sleep(Seconds(1));
385
386        let first_time = now();
387
388        sleep(Millis(25)).await;
389
390        let second_time = now();
391        assert!(second_time - first_time >= time::Duration::from_millis(25));
392    }
393
394    /// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval.
395    ///
396    /// Expected Behavior: Two back-to-back calls of `now()` return the same value.
397    #[ntex_macros::rt_test2]
398    async fn system_time_service_time_does_not_immediately_change() {
399        let _ = sleep(Seconds(1));
400
401        assert_eq!(system_time(), system_time());
402        assert_eq!(system_time(), query_system_time());
403    }
404
405    /// State Under Test: `system_time()` updates returned value every 1ms period.
406    ///
407    /// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values
408    /// and second value is greater than the first one at least by a resolution interval.
409    #[ntex_macros::rt_test2]
410    async fn system_time_service_time_updates_after_resolution_interval() {
411        let _ = sleep(Seconds(1));
412
413        let wait_time = 300;
414
415        let first_time = system_time()
416            .duration_since(time::SystemTime::UNIX_EPOCH)
417            .unwrap();
418
419        sleep(Millis(wait_time)).await;
420
421        let second_time = system_time()
422            .duration_since(time::SystemTime::UNIX_EPOCH)
423            .unwrap();
424
425        assert!(second_time - first_time >= time::Duration::from_millis(wait_time as u64));
426    }
427
428    #[ntex_macros::rt_test2]
429    async fn test_sleep_0() {
430        let _ = sleep(Seconds(1));
431
432        let first_time = now();
433        sleep(Millis(0)).await;
434        let second_time = now();
435        assert!(second_time - first_time >= time::Duration::from_millis(1));
436
437        let first_time = now();
438        sleep(Millis(1)).await;
439        let second_time = now();
440        assert!(second_time - first_time >= time::Duration::from_millis(1));
441
442        let first_time = now();
443        let fut = sleep(Millis(10000));
444        assert!(!fut.is_elapsed());
445        fut.reset(Millis::ZERO);
446        fut.await;
447        let second_time = now();
448        assert!(second_time - first_time < time::Duration::from_millis(1));
449
450        let first_time = now();
451        let fut = Sleep {
452            hnd: TimerHandle::new(0),
453        };
454        assert!(fut.is_elapsed());
455        fut.await;
456        let second_time = now();
457        assert!(second_time - first_time < time::Duration::from_millis(1));
458
459        let first_time = now();
460        let fut = Rc::new(sleep(Millis(100000)));
461        let s = fut.clone();
462        ntex::rt::spawn(async move {
463            s.elapse();
464        });
465        poll_fn(|cx| fut.poll_elapsed(cx)).await;
466        assert!(fut.is_elapsed());
467        let second_time = now();
468        assert!(second_time - first_time < time::Duration::from_millis(1));
469    }
470
471    #[ntex_macros::rt_test2]
472    async fn test_deadline() {
473        let _ = sleep(Seconds(1));
474
475        let first_time = now();
476        let dl = deadline(Millis(1));
477        dl.await;
478        let second_time = now();
479        assert!(second_time - first_time >= time::Duration::from_millis(1));
480        assert!(timeout(Millis(100), deadline(Millis(0))).await.is_err());
481
482        let mut dl = deadline(Millis(1));
483        dl.reset(Millis::ZERO);
484        assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
485
486        let mut dl = deadline(Millis(1));
487        dl.reset(Millis(100));
488        let first_time = now();
489        dl.await;
490        let second_time = now();
491        assert!(second_time - first_time >= time::Duration::from_millis(100));
492
493        let mut dl = deadline(Millis(0));
494        assert!(dl.is_elapsed());
495        dl.reset(Millis(1));
496        assert!(lazy(|cx| dl.poll_elapsed(cx)).await.is_pending());
497
498        assert!(format!("{dl:?}").contains("Deadline"));
499    }
500
501    #[ntex_macros::rt_test2]
502    async fn test_interval() {
503        let mut int = interval(Millis(250));
504
505        let time = time::Instant::now();
506        int.tick().await;
507        let elapsed = time::Instant::now() - time;
508        assert!(
509            elapsed > time::Duration::from_millis(200)
510                && elapsed < time::Duration::from_millis(450),
511            "elapsed: {elapsed:?}"
512        );
513
514        let time = time::Instant::now();
515        int.next().await;
516        let elapsed = time::Instant::now() - time;
517        assert!(
518            elapsed > time::Duration::from_millis(200)
519                && elapsed < time::Duration::from_millis(450),
520            "elapsed: {elapsed:?}"
521        );
522    }
523
524    #[ntex_macros::rt_test2]
525    async fn test_interval_one_sec() {
526        let int = interval(Millis::ONE_SEC);
527
528        for _i in 0..3 {
529            let time = time::Instant::now();
530            int.tick().await;
531            let elapsed = time::Instant::now() - time;
532            assert!(
533                elapsed > time::Duration::from_millis(1000)
534                    && elapsed < time::Duration::from_millis(1300),
535                "elapsed: {elapsed:?}"
536            );
537        }
538    }
539
540    #[ntex_macros::rt_test2]
541    async fn test_timeout_checked() {
542        let result = timeout_checked(Millis(200), sleep(Millis(100))).await;
543        assert!(result.is_ok());
544
545        let result = timeout_checked(Millis(5), sleep(Millis(100))).await;
546        assert!(result.is_err());
547
548        let result = timeout_checked(Millis(0), sleep(Millis(100))).await;
549        assert!(result.is_ok());
550    }
551}