n0_future/
time.rs

1//! Sleep and timeout utilities that work natively (via tokio) and in the browser.
2
3#[cfg(not(wasm_browser))]
4pub use std::time::SystemTime;
5
6#[cfg(not(wasm_browser))]
7pub use tokio::time::{
8    error::Elapsed, interval, interval_at, sleep, sleep_until, timeout, Duration, Instant,
9    Interval, MissedTickBehavior, Sleep, Timeout,
10};
11#[cfg(wasm_browser)]
12pub use wasm::{
13    error::Elapsed, interval, interval_at, sleep, sleep_until, timeout, Duration, Instant,
14    Interval, MissedTickBehavior, Sleep, SystemTime, Timeout,
15};
16
17#[cfg(wasm_browser)]
18mod wasm {
19    use std::{
20        future::{Future, IntoFuture},
21        pin::Pin,
22        sync::{
23            atomic::{AtomicBool, Ordering::Relaxed},
24            Arc,
25        },
26        task::{Context, Poll},
27    };
28
29    use futures_util::task::AtomicWaker;
30    use send_wrapper::SendWrapper;
31    use wasm_bindgen::{closure::Closure, prelude::wasm_bindgen, JsCast, JsValue};
32    pub use web_time::{Duration, Instant, SystemTime};
33
34    /// Future that will wake up once its deadline is reached.
35    #[derive(Debug)]
36    pub struct Sleep {
37        deadline: Instant,
38        triggered: Option<Flag>,
39        timeout_id: Option<SendWrapper<JsValue>>,
40    }
41
42    /// Sleeps for given duration
43    pub fn sleep(duration: Duration) -> Sleep {
44        // javascript can't handle setTimeout durations as big as rust, so we
45        // can't rely on `now.checked_add` to overflow.
46        if duration > Duration::from_secs(60 * 60 * 24 * 365 * 10) {
47            return sleep_forever();
48        }
49        let now = Instant::now();
50        if let Some(deadline) = now.checked_add(duration) {
51            sleep_impl(duration, deadline)
52        } else {
53            sleep_forever()
54        }
55    }
56
57    /// Sleeps until given deadline
58    pub fn sleep_until(deadline: Instant) -> Sleep {
59        let now = Instant::now();
60        let duration = deadline.duration_since(now);
61        sleep_impl(duration, deadline)
62    }
63
64    fn sleep_impl(duration: Duration, deadline: Instant) -> Sleep {
65        let triggered = Flag::new();
66
67        let closure = Closure::once({
68            let triggered = triggered.clone();
69            move || triggered.signal()
70        });
71
72        let timeout_id = Some(SendWrapper::new(
73            set_timeout(
74                closure.into_js_value().unchecked_into(),
75                duration.as_millis() as i32,
76            )
77            .expect("missing setTimeout function on globalThis"),
78        ));
79
80        Sleep {
81            deadline,
82            triggered: Some(triggered),
83            timeout_id,
84        }
85    }
86
87    fn sleep_forever() -> Sleep {
88        // fake a deadline that's far in the future (10 years)
89        let deadline = Instant::now() + Duration::from_secs(60 * 60 * 24 * 365 * 10);
90        Sleep {
91            triggered: None,
92            deadline,
93            timeout_id: None,
94        }
95    }
96
97    impl Future for Sleep {
98        type Output = ();
99
100        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101            match &mut self.triggered {
102                Some(ref mut triggered) => Pin::new(triggered).poll_signaled(cx),
103                None => Poll::Pending,
104            }
105        }
106    }
107
108    impl Drop for Sleep {
109        fn drop(&mut self) {
110            if let Some(timeout_id) = self.timeout_id.as_ref() {
111                // If not, then in the worst case we're leaking a timeout
112                if timeout_id.valid() {
113                    clear_timeout(timeout_id.as_ref().clone()).ok();
114                }
115            }
116        }
117    }
118
119    impl Sleep {
120        /// Returns the instant at which the sleep is scheduled to wake up
121        pub fn deadline(&self) -> Instant {
122            self.deadline
123        }
124
125        /// Returns whether the sleep has reached its deadline
126        /// (and the scheduler has handled the sleep's timer).
127        pub fn is_elapsed(&self) -> bool {
128            self.triggered.as_ref().map_or(false, Flag::has_triggered)
129        }
130
131        /// Resets this sleep's deadline to given instant.
132        ///
133        /// Also works with sleeps that have already reached their deadline
134        /// in the past.
135        pub fn reset(mut self: Pin<&mut Self>, deadline: Instant) {
136            let duration = deadline.duration_since(Instant::now());
137            let triggered = Flag::new();
138
139            let closure = Closure::once({
140                let triggered = triggered.clone();
141                move || triggered.signal()
142            });
143
144            let timeout_id = SendWrapper::new(
145                set_timeout(
146                    closure.into_js_value().unchecked_into(),
147                    duration.as_millis() as i32,
148                )
149                .expect("missing setTimeout function on globalThis"),
150            );
151
152            let mut this = self.as_mut();
153            this.deadline = deadline;
154            this.triggered = Some(triggered);
155            let old_timeout_id = std::mem::replace(&mut this.timeout_id, Some(timeout_id));
156            if let Some(timeout_id) = old_timeout_id {
157                // If not valid, then in the worst case we're leaking a timeout task
158                if timeout_id.valid() {
159                    clear_timeout(timeout_id.as_ref().clone()).ok();
160                }
161            }
162        }
163
164        /// Resets this sleep to never wake up again (unless reset to a different timeout).
165        fn reset_forever(mut self: Pin<&mut Self>) {
166            let mut this = self.as_mut();
167            this.deadline = Instant::now() + Duration::from_secs(60 * 60 * 24 * 365 * 10);
168            this.triggered = None;
169            let old_timeout_id = std::mem::replace(&mut this.timeout_id, None);
170            if let Some(timeout_id) = old_timeout_id {
171                // If not valid, then in the worst case we're leaking a timeout task
172                if timeout_id.valid() {
173                    clear_timeout(timeout_id.as_ref().clone()).ok();
174                }
175            }
176        }
177    }
178
179    /// Future that either resolves to [`error::Elapsed`] if the timeout
180    /// is hit first. Otherwise, it resolves to `Ok` of the wrapped future.
181    #[derive(Debug)]
182    #[pin_project::pin_project]
183    pub struct Timeout<T> {
184        #[pin]
185        future: T,
186        #[pin]
187        sleep: Sleep,
188    }
189
190    /// Error structs for time utilities (wasm mirror for `tokio::time::error`).
191    pub mod error {
192        /// Error when a timeout is elapsed.
193        #[derive(Debug, derive_more::Display)]
194        #[display("deadline has elapsed")]
195        pub struct Elapsed;
196
197        impl std::error::Error for Elapsed {}
198    }
199
200    /// Timeout of a function in wasm.
201    pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F::IntoFuture>
202    where
203        F: IntoFuture,
204    {
205        Timeout {
206            future: future.into_future(),
207            sleep: sleep(duration),
208        }
209    }
210
211    impl<T: Future> Future for Timeout<T> {
212        type Output = Result<T::Output, error::Elapsed>;
213
214        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
215            let this = self.project();
216
217            if let Poll::Ready(result) = this.future.poll(cx) {
218                return Poll::Ready(Ok(result));
219            }
220
221            if let Poll::Ready(()) = this.sleep.poll(cx) {
222                return Poll::Ready(Err(error::Elapsed));
223            }
224
225            Poll::Pending
226        }
227    }
228
229    impl<T> Timeout<T> {
230        /// Returns a reference of the wrapped future.
231        pub fn get_ref(&self) -> &T {
232            &self.future
233        }
234
235        /// Returns a mutable reference to the wrapped future.
236        pub fn get_mut(&mut self) -> &mut T {
237            &mut self.future
238        }
239
240        /// Returns the wrapped future and throws away and cancels the
241        /// associated timeout.
242        pub fn into_inner(self) -> T {
243            self.future
244        }
245    }
246
247    /// Defines the behavior of an [`Interval`] when it misses a tick.
248    #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
249    pub enum MissedTickBehavior {
250        /// Ticks as fast as possible until caught up.
251        #[default]
252        Burst,
253
254        /// Tick at multiples of `period` from when [`Interval::tick`] was called, rather than
255        /// from `start`.
256        Delay,
257
258        /// Skips missed ticks and tick on the next multiple of `period` from
259        /// `start`.
260        Skip,
261    }
262
263    impl MissedTickBehavior {
264        /// If a tick is missed, this method is called to determine when the next tick should happen.
265        fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant {
266            match self {
267                Self::Burst => timeout + period,
268                Self::Delay => now + period,
269                Self::Skip => {
270                    now + period
271                    - Duration::from_nanos(
272                        ((now - timeout).as_nanos() % period.as_nanos())
273                            .try_into()
274                            // This operation is practically guaranteed not to
275                            // fail, as in order for it to fail, `period` would
276                            // have to be longer than `now - timeout`, and both
277                            // would have to be longer than 584 years.
278                            //
279                            // If it did fail, there's not a good way to pass
280                            // the error along to the user, so we just panic.
281                            .expect(
282                                "too much time has elapsed since the interval was supposed to tick",
283                            ),
284                    )
285                }
286            }
287        }
288    }
289
290    /// Interval returned by [`interval`] and [`interval_at`].
291    #[derive(Debug)]
292    pub struct Interval {
293        delay: Pin<Box<Sleep>>,
294        period: Duration,
295        missed_tick_behavior: MissedTickBehavior,
296    }
297
298    /// Creates new [`Interval`] that yields with interval of `period`. The first
299    /// tick completes immediately. The default [`MissedTickBehavior`] is
300    /// [`Burst`](MissedTickBehavior::Burst), but this can be configured
301    /// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
302    ///
303    /// An interval will tick indefinitely. At any time, the [`Interval`] value can
304    /// be dropped. This cancels the interval.
305    ///
306    /// This function is equivalent to
307    /// [`interval_at(Instant::now(), period)`](interval_at).
308    pub fn interval(period: Duration) -> Interval {
309        assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
310
311        interval_at(Instant::now(), period)
312    }
313
314    /// Creates new [`Interval`] that yields with interval of `period` with the
315    /// first tick completing at `start`. The default [`MissedTickBehavior`] is
316    /// [`Burst`](MissedTickBehavior::Burst), but this can be configured
317    /// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
318    ///
319    /// An interval will tick indefinitely. At any time, the [`Interval`] value can
320    /// be dropped. This cancels the interval.
321    #[track_caller]
322    pub fn interval_at(start: Instant, period: Duration) -> Interval {
323        assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
324
325        let delay = Box::pin(sleep_until(start));
326
327        Interval {
328            delay,
329            period,
330            missed_tick_behavior: MissedTickBehavior::default(),
331        }
332    }
333
334    impl Interval {
335        /// Completes when the next instant in the interval has been reached.
336        pub async fn tick(&mut self) -> Instant {
337            futures_lite::future::poll_fn(|cx| self.poll_tick(cx)).await
338        }
339
340        /// Polls for the next instant in the interval to be reached.
341        pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
342            // Wait for the delay to be done
343            futures_lite::ready!(Pin::new(&mut self.delay).poll(cx));
344
345            // Get the time when we were scheduled to tick
346            let timeout = self.delay.deadline();
347
348            let now = Instant::now();
349
350            // If a tick was not missed, and thus we are being called before the
351            // next tick is due, just schedule the next tick normally, one `period`
352            // after `timeout`
353            //
354            // However, if a tick took excessively long and we are now behind,
355            // schedule the next tick according to how the user specified with
356            // `MissedTickBehavior`
357            let next = if now > timeout + Duration::from_millis(5) {
358                Some(
359                    self.missed_tick_behavior
360                        .next_timeout(timeout, now, self.period),
361                )
362            } else {
363                timeout.checked_add(self.period)
364            };
365
366            if let Some(next) = next {
367                self.delay.as_mut().reset(next);
368            } else {
369                self.delay.as_mut().reset_forever()
370            }
371
372            // Return the time when we were scheduled to tick
373            Poll::Ready(timeout)
374        }
375
376        /// Resets the interval to complete one period after the current time.
377        pub fn reset(&mut self) {
378            self.delay.as_mut().reset(Instant::now() + self.period);
379        }
380
381        /// Resets the interval immediately.
382        pub fn reset_immediately(&mut self) {
383            self.delay.as_mut().reset(Instant::now());
384        }
385
386        /// Resets the interval after the specified [`std::time::Duration`].
387        pub fn reset_after(&mut self, after: Duration) {
388            self.delay.as_mut().reset(Instant::now() + after);
389        }
390
391        /// Resets the interval to a [`crate::time::Instant`] deadline.
392        pub fn reset_at(&mut self, deadline: Instant) {
393            self.delay.as_mut().reset(deadline);
394        }
395
396        /// Returns the [`MissedTickBehavior`] strategy currently being used.
397        pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
398            self.missed_tick_behavior
399        }
400
401        /// Sets the [`MissedTickBehavior`] strategy that should be used.
402        pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
403            self.missed_tick_behavior = behavior;
404        }
405
406        /// Returns the period of the interval.
407        pub fn period(&self) -> Duration {
408            self.period
409        }
410    }
411
412    // Private impls
413
414    #[derive(Clone, Debug)]
415    struct Flag(Arc<Inner>);
416
417    #[derive(Debug)]
418    struct Inner {
419        waker: AtomicWaker,
420        set: AtomicBool,
421    }
422
423    impl Flag {
424        fn new() -> Self {
425            Self(Arc::new(Inner {
426                waker: AtomicWaker::new(),
427                set: AtomicBool::new(false),
428            }))
429        }
430
431        fn has_triggered(&self) -> bool {
432            self.0.set.load(Relaxed)
433        }
434
435        fn signal(&self) {
436            self.0.set.store(true, Relaxed);
437            self.0.waker.wake();
438        }
439
440        fn poll_signaled(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
441            // quick check to avoid registration if already done.
442            if self.0.set.load(Relaxed) {
443                return Poll::Ready(());
444            }
445
446            self.0.waker.register(cx.waker());
447
448            // Need to check condition **after** `register` to avoid a race
449            // condition that would result in lost notifications.
450            if self.0.set.load(Relaxed) {
451                Poll::Ready(())
452            } else {
453                Poll::Pending
454            }
455        }
456    }
457
458    // Wasm-bindgen stuff
459
460    #[wasm_bindgen]
461    extern "C" {
462        type GlobalScope;
463
464        #[wasm_bindgen(catch, method, js_name = "setTimeout")]
465        fn set_timeout_with_callback_and_timeout_and_arguments_0(
466            this: &GlobalScope,
467            handler: js_sys::Function,
468            timeout: i32,
469        ) -> Result<JsValue, JsValue>;
470
471        #[wasm_bindgen(catch, method, js_name = "clearTimeout")]
472        fn clear_timeout_with_handle(
473            this: &GlobalScope,
474            timeout_id: JsValue,
475        ) -> Result<(), JsValue>;
476    }
477
478    fn set_timeout(handler: js_sys::Function, timeout: i32) -> Result<JsValue, JsValue> {
479        let global_this = js_sys::global();
480        let global_scope = global_this.unchecked_ref::<GlobalScope>();
481        global_scope.set_timeout_with_callback_and_timeout_and_arguments_0(handler, timeout)
482    }
483
484    fn clear_timeout(timeout_id: JsValue) -> Result<(), JsValue> {
485        let global_this = js_sys::global();
486        let global_scope = global_this.unchecked_ref::<GlobalScope>();
487        global_scope.clear_timeout_with_handle(timeout_id)
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    // TODO(matheus23): Write some tests for `sleep`, `sleep_until`, `timeout` and `Sleep::reset`
494}