fluvio_future/
timer.rs

1pub use inner::*;
2
3#[cfg(not(target_arch = "wasm32"))]
4mod inner {
5
6    use std::pin::Pin;
7    use std::task::{Context, Poll};
8    use std::time::Duration;
9
10    use async_io::Timer;
11    use futures_lite::future::Future;
12
13    use pin_project::pin_project;
14
15    /// same as `after` but return () to make it compatible as previous
16    pub fn sleep(duration: Duration) -> Sleeper {
17        Sleeper(after(duration))
18    }
19
20    #[pin_project]
21    pub struct Sleeper(#[pin] Timer);
22
23    impl Future for Sleeper {
24        type Output = ();
25
26        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27            let this = self.project();
28            if this.0.poll(cx).is_ready() {
29                Poll::Ready(())
30            } else {
31                Poll::Pending
32            }
33        }
34    }
35
36    /// wait for until `duration` has elapsed.
37    ///
38    /// this effectively give back control to async execution engine until duration is finished
39    ///
40    /// # Examples
41    ///
42    /// ```
43    /// use fluvio_future::timer::after;
44    /// use std::time::{Duration, Instant};
45    ///
46    /// fluvio_future::task::run(async {
47    ///     after(Duration::from_secs(1)).await;
48    /// });
49    /// ```
50    pub fn after(duration: Duration) -> Timer {
51        Timer::after(duration)
52    }
53}
54#[cfg(target_arch = "wasm32")]
55mod inner {
56    use std::time::Duration;
57    // TODO: when https://github.com/tomaka/wasm-timer/pull/13 is merged, move this back to
58    // wasm-timer
59    use fluvio_wasm_timer::Delay;
60    pub fn sleep(duration: Duration) -> Delay {
61        Delay::new(duration)
62    }
63}
64
65#[cfg(test)]
66#[cfg(not(target_arch = "wasm32"))]
67mod test {
68
69    use std::time::Duration;
70    use std::time::Instant;
71
72    use tokio::select;
73    use tracing::debug;
74
75    use crate::timer::sleep;
76
77    /// test timer loop
78    #[fluvio_future::test]
79    async fn test_sleep() {
80        let mut times_fired: u16 = 0;
81        let mut times_not_fired: u16 = 0;
82        let time_now = Instant::now();
83
84        let mut sleep_ft = sleep(Duration::from_millis(10));
85
86        for _ in 0u16..10u16 {
87            select! {
88                _ = &mut sleep_ft => {
89                    // fire everytime but won't make cause more delay than initial 10 ms
90                    times_fired += 1;
91                    debug!("timer fired");
92                }
93
94                _ = sleep(Duration::from_millis(40)) => {
95                    times_not_fired += 1;
96                }
97            }
98        }
99
100        let elapsed = time_now.elapsed();
101
102        debug!("total time elaspsed: {:#?}", elapsed);
103
104        assert!(elapsed < Duration::from_millis(1000)); // make this generous to handle slow CI
105        assert!(elapsed > Duration::from_millis(10));
106        assert_eq!(times_fired, 1);
107        assert_eq!(times_not_fired, 9);
108    }
109}