ntex_util/time/
wheel.rs

1//! Time wheel based timer service.
2//!
3//! Inspired by linux kernel timers system
4#![allow(arithmetic_overflow, clippy::let_underscore_future)]
5use std::time::{Duration, Instant, SystemTime};
6use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
7
8use futures_timer::Delay;
9use slab::Slab;
10
11use crate::task::LocalWaker;
12
13// Clock divisor for the next level
14const LVL_CLK_SHIFT: u64 = 3;
15const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
16const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
17
18const fn lvl_shift(n: u64) -> u64 {
19    n * LVL_CLK_SHIFT
20}
21
22const fn lvl_gran(n: u64) -> u64 {
23    1 << lvl_shift(n)
24}
25
26// Resolution:
27// 0: 1 millis
28// 4: ~17 millis
29const UNITS: u64 = 4;
30// const UNITS: u64 = 0;
31
32const fn to_units(n: u64) -> u64 {
33    n >> UNITS
34}
35
36const fn to_millis(n: u64) -> u64 {
37    n << UNITS
38}
39
40// The time start value for each level to select the bucket at enqueue time
41const fn lvl_start(lvl: u64) -> u64 {
42    (LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
43}
44
45// Size of each clock level
46const LVL_BITS: u64 = 6;
47const LVL_SIZE: u64 = 1 << LVL_BITS;
48const LVL_MASK: u64 = LVL_SIZE - 1;
49
50// Level depth
51const LVL_DEPTH: u64 = 8;
52
53const fn lvl_offs(n: u64) -> u64 {
54    n * LVL_SIZE
55}
56
57// The cutoff (max. capacity of the wheel)
58const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
59const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
60const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
61
62// Low res time resolution
63const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
64
65const fn as_millis(dur: Duration) -> u64 {
66    dur.as_secs() * 1_000 + (dur.subsec_millis() as u64)
67}
68
69/// Returns an instant corresponding to “now”.
70///
71/// Resolution is 5ms
72#[inline]
73pub fn now() -> Instant {
74    TIMER.with(|t| t.with_mod(|inner| t.now(inner)))
75}
76
77/// Returns the system time corresponding to “now”.
78///
79/// Resolution is 5ms
80#[inline]
81pub fn system_time() -> SystemTime {
82    TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
83}
84
85/// Returns the system time corresponding to “now”.
86///
87/// If low resolution system time is not set, use system time.
88/// This method does not start timer driver.
89#[inline]
90pub fn query_system_time() -> SystemTime {
91    TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
92}
93
94#[derive(Debug)]
95pub struct TimerHandle(usize);
96
97impl TimerHandle {
98    /// Createt new timer and return handle
99    pub fn new(millis: u64) -> Self {
100        TIMER.with(|t| t.add_timer(millis))
101    }
102
103    /// Resets the `TimerHandle` instance to a new deadline.
104    pub fn reset(&self, millis: u64) {
105        TIMER.with(|t| t.update_timer(self.0, millis))
106    }
107
108    /// Resets the `TimerHandle` instance to elapsed state.
109    pub fn elapse(&self) {
110        TIMER.with(|t| t.remove_timer(self.0))
111    }
112
113    pub fn is_elapsed(&self) -> bool {
114        TIMER.with(|t| t.with_mod(|m| m.timers[self.0].bucket.is_none()))
115    }
116
117    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
118        TIMER.with(|t| {
119            t.with_mod(|inner| {
120                let entry = &inner.timers[self.0];
121                if entry.bucket.is_none() {
122                    Poll::Ready(())
123                } else {
124                    entry.task.register(cx.waker());
125                    Poll::Pending
126                }
127            })
128        })
129    }
130}
131
132impl Drop for TimerHandle {
133    fn drop(&mut self) {
134        TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0, true)))
135    }
136}
137
138bitflags::bitflags! {
139    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
140    pub struct Flags: u8 {
141        const DRIVER_STARTED = 0b0000_0001;
142        const DRIVER_RECALC  = 0b0000_0010;
143        const LOWRES_TIMER   = 0b0000_1000;
144        const LOWRES_DRIVER  = 0b0001_0000;
145        const RUNNING        = 0b0010_0000;
146    }
147}
148
149thread_local! {
150    static TIMER: Timer = Timer::new();
151}
152
153struct Timer(Rc<TimerInner>);
154
155struct TimerInner {
156    elapsed: Cell<u64>,
157    elapsed_time: Cell<Option<Instant>>,
158    next_expiry: Cell<u64>,
159    flags: Cell<Flags>,
160    driver: LocalWaker,
161    lowres_time: Cell<Option<Instant>>,
162    lowres_stime: Cell<Option<SystemTime>>,
163    lowres_driver: LocalWaker,
164    inner: Cell<Option<Box<TimerMod>>>,
165}
166
167struct TimerMod {
168    timers: Slab<TimerEntry>,
169    driver_sleep: Delay,
170    buckets: Vec<Bucket>,
171    /// Bit field tracking which bucket currently contain entries.
172    occupied: [u64; WHEEL_SIZE],
173    lowres_driver_sleep: Delay,
174}
175
176impl Timer {
177    fn new() -> Self {
178        Timer(Rc::new(TimerInner {
179            elapsed: Cell::new(0),
180            elapsed_time: Cell::new(None),
181            next_expiry: Cell::new(u64::MAX),
182            flags: Cell::new(Flags::empty()),
183            driver: LocalWaker::new(),
184            lowres_time: Cell::new(None),
185            lowres_stime: Cell::new(None),
186            lowres_driver: LocalWaker::new(),
187            inner: Cell::new(Some(Box::new(TimerMod {
188                buckets: Self::create_buckets(),
189                timers: Slab::default(),
190                driver_sleep: Delay::new(Duration::ZERO),
191                occupied: [0; WHEEL_SIZE],
192                lowres_driver_sleep: Delay::new(Duration::ZERO),
193            }))),
194        }))
195    }
196
197    fn with_mod<F, R>(&self, f: F) -> R
198    where
199        F: FnOnce(&mut TimerMod) -> R,
200    {
201        let mut m = self.0.inner.take().unwrap();
202        let result = f(&mut m);
203        self.0.inner.set(Some(m));
204        result
205    }
206
207    fn create_buckets() -> Vec<Bucket> {
208        let mut buckets = Vec::with_capacity(WHEEL_SIZE);
209        for idx in 0..WHEEL_SIZE {
210            let lvl = idx / (LVL_SIZE as usize);
211            let offs = idx % (LVL_SIZE as usize);
212            buckets.push(Bucket::new(lvl, offs))
213        }
214        buckets
215    }
216
217    fn now(&self, inner: &mut TimerMod) -> Instant {
218        if let Some(cur) = self.0.lowres_time.get() {
219            cur
220        } else {
221            let now = Instant::now();
222
223            let flags = self.0.flags.get();
224            if flags.contains(Flags::RUNNING) {
225                self.0.lowres_time.set(Some(now));
226
227                if flags.contains(Flags::LOWRES_DRIVER) {
228                    self.0.lowres_driver.wake();
229                } else {
230                    LowresTimerDriver::start(self.0.clone(), inner);
231                }
232            }
233            now
234        }
235    }
236
237    fn system_time(&self, inner: &mut TimerMod) -> SystemTime {
238        if let Some(cur) = self.0.lowres_stime.get() {
239            cur
240        } else {
241            let now = SystemTime::now();
242            let flags = self.0.flags.get();
243
244            if flags.contains(Flags::RUNNING) {
245                self.0.lowres_stime.set(Some(now));
246
247                if flags.contains(Flags::LOWRES_DRIVER) {
248                    self.0.lowres_driver.wake();
249                } else {
250                    LowresTimerDriver::start(self.0.clone(), inner);
251                }
252            }
253            now
254        }
255    }
256
257    /// Add the timer into the hash bucket
258    fn add_timer(&self, millis: u64) -> TimerHandle {
259        self.with_mod(|inner| {
260            if millis == 0 {
261                let entry = inner.timers.vacant_entry();
262                let no = entry.key();
263
264                entry.insert(TimerEntry {
265                    bucket_entry: 0,
266                    bucket: None,
267                    task: LocalWaker::new(),
268                });
269                return TimerHandle(no);
270            }
271
272            let mut flags = self.0.flags.get();
273            flags.insert(Flags::RUNNING);
274            self.0.flags.set(flags);
275
276            let now = self.now(inner);
277            let elapsed_time = self.0.elapsed_time();
278            let delta = if now >= elapsed_time {
279                to_units(as_millis(now - elapsed_time) + millis)
280            } else {
281                to_units(millis)
282            };
283
284            let (no, bucket_expiry) = {
285                // crate timer entry
286                let (idx, bucket_expiry) = self
287                    .0
288                    .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
289
290                let no = inner.add_entry(idx);
291                (no, bucket_expiry)
292            };
293
294            // Check whether new bucket expire earlier
295            if bucket_expiry < self.0.next_expiry.get() {
296                self.0.next_expiry.set(bucket_expiry);
297                if flags.contains(Flags::DRIVER_STARTED) {
298                    flags.insert(Flags::DRIVER_RECALC);
299                    self.0.flags.set(flags);
300                    self.0.driver.wake();
301                } else {
302                    TimerDriver::start(self.0.clone(), inner);
303                }
304            }
305
306            TimerHandle(no)
307        })
308    }
309
310    /// Remove timer and wake task
311    fn remove_timer(&self, hnd: usize) {
312        self.with_mod(|inner| {
313            inner.remove_timer_bucket(hnd, false);
314            inner.timers[hnd].complete();
315        })
316    }
317
318    /// Update existing timer
319    fn update_timer(&self, hnd: usize, millis: u64) {
320        self.with_mod(|inner| {
321            if millis == 0 {
322                inner.remove_timer_bucket(hnd, false);
323                inner.timers[hnd].bucket = None;
324                return;
325            }
326
327            let now = self.now(inner);
328            let elapsed_time = self.0.elapsed_time();
329            let delta = if now >= elapsed_time {
330                max(to_units(as_millis(now - elapsed_time) + millis), 1)
331            } else {
332                max(to_units(millis), 1)
333            };
334
335            let bucket_expiry = {
336                // calc bucket
337                let (idx, bucket_expiry) = self
338                    .0
339                    .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
340
341                inner.update_entry(hnd, idx);
342
343                bucket_expiry
344            };
345
346            // Check whether new bucket expire earlier
347            if bucket_expiry < self.0.next_expiry.get() {
348                self.0.next_expiry.set(bucket_expiry);
349                let mut flags = self.0.flags.get();
350                if flags.contains(Flags::DRIVER_STARTED) {
351                    flags.insert(Flags::DRIVER_RECALC);
352                    self.0.flags.set(flags);
353                    self.0.driver.wake();
354                } else {
355                    TimerDriver::start(self.0.clone(), inner);
356                }
357            }
358        })
359    }
360}
361
362impl TimerMod {
363    fn execute_expired_timers(&mut self, mut clk: u64) {
364        for lvl in 0..LVL_DEPTH {
365            let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
366            let b = &mut self.buckets[idx as usize];
367            if !b.entries.is_empty() {
368                self.occupied[b.lvl as usize] &= b.bit_n;
369                for no in b.entries.drain() {
370                    if let Some(timer) = self.timers.get_mut(no) {
371                        timer.complete();
372                    }
373                }
374            }
375
376            // Is it time to look at the next level?
377            if (clk & LVL_CLK_MASK) != 0 {
378                break;
379            }
380            // Shift clock for the next level granularity
381            clk >>= LVL_CLK_SHIFT;
382        }
383    }
384
385    fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
386        let entry = &mut self.timers[handle];
387        if let Some(bucket) = entry.bucket {
388            let b = &mut self.buckets[bucket as usize];
389            b.entries.remove(entry.bucket_entry);
390            if b.entries.is_empty() {
391                self.occupied[b.lvl as usize] &= b.bit_n;
392            }
393        }
394
395        if remove_handle {
396            self.timers.remove(handle);
397        }
398    }
399
400    fn add_entry(&mut self, idx: usize) -> usize {
401        let entry = self.timers.vacant_entry();
402        let no = entry.key();
403        let bucket = &mut self.buckets[idx];
404        let bucket_entry = bucket.add_entry(no);
405
406        entry.insert(TimerEntry {
407            bucket_entry,
408            bucket: Some(idx as u16),
409            task: LocalWaker::new(),
410        });
411        self.occupied[bucket.lvl as usize] |= bucket.bit;
412
413        no
414    }
415
416    fn update_entry(&mut self, hnd: usize, idx: usize) {
417        let entry = &mut self.timers[hnd];
418
419        // cleanup active timer
420        if let Some(bucket) = entry.bucket {
421            // do not do anything if wheel bucket is the same
422            if idx == bucket as usize {
423                return;
424            }
425
426            // remove timer entry from current bucket
427            let b = &mut self.buckets[bucket as usize];
428            b.entries.remove(entry.bucket_entry);
429            if b.entries.is_empty() {
430                self.occupied[b.lvl as usize] &= b.bit_n;
431            }
432        }
433
434        // put timer to new bucket
435        let bucket = &mut self.buckets[idx];
436        entry.bucket = Some(idx as u16);
437        entry.bucket_entry = bucket.add_entry(hnd);
438
439        self.occupied[bucket.lvl as usize] |= bucket.bit;
440    }
441}
442
443impl TimerInner {
444    fn with_mod<F, R>(&self, f: F) -> R
445    where
446        F: FnOnce(&mut TimerMod) -> R,
447    {
448        let mut m = self.inner.take().unwrap();
449        let result = f(&mut m);
450        self.inner.set(Some(m));
451        result
452    }
453
454    fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
455        if delta < lvl_start(1) {
456            Self::calc_index(expires, 0)
457        } else if delta < lvl_start(2) {
458            Self::calc_index(expires, 1)
459        } else if delta < lvl_start(3) {
460            Self::calc_index(expires, 2)
461        } else if delta < lvl_start(4) {
462            Self::calc_index(expires, 3)
463        } else if delta < lvl_start(5) {
464            Self::calc_index(expires, 4)
465        } else if delta < lvl_start(6) {
466            Self::calc_index(expires, 5)
467        } else if delta < lvl_start(7) {
468            Self::calc_index(expires, 6)
469        } else if delta < lvl_start(8) {
470            Self::calc_index(expires, 7)
471        } else {
472            // Force expire obscene large timeouts to expire at the
473            // capacity limit of the wheel.
474            if delta >= WHEEL_TIMEOUT_CUTOFF {
475                Self::calc_index(
476                    self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
477                    LVL_DEPTH - 1,
478                )
479            } else {
480                Self::calc_index(expires, LVL_DEPTH - 1)
481            }
482        }
483    }
484
485    /// Helper function to calculate the bucket index and bucket expiration
486    fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
487        // The timer wheel has to guarantee that a timer does not fire
488        // early. Early expiry can happen due to:
489        // - Timer is armed at the edge of a tick
490        // - Truncation of the expiry time in the outer wheel levels
491        //
492        // Round up with level granularity to prevent this.
493
494        let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
495        (
496            (lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
497            expires << lvl_shift(lvl),
498        )
499    }
500
501    fn elapsed_time(&self) -> Instant {
502        if let Some(elapsed_time) = self.elapsed_time.get() {
503            elapsed_time
504        } else {
505            let elapsed_time = Instant::now();
506            self.elapsed_time.set(Some(elapsed_time));
507            elapsed_time
508        }
509    }
510
511    fn execute_expired_timers(&self, inner: &mut TimerMod) {
512        inner.execute_expired_timers(self.next_expiry.get());
513    }
514
515    /// Find next expiration bucket
516    fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option<u64> {
517        let mut clk = self.elapsed.get();
518        let mut next = u64::MAX;
519
520        for lvl in 0..LVL_DEPTH {
521            let lvl_clk = clk & LVL_CLK_MASK;
522            let occupied = inner.occupied[lvl as usize];
523            let pos = if occupied == 0 {
524                -1
525            } else {
526                let zeros = occupied
527                    .rotate_right((clk & LVL_MASK) as u32)
528                    .trailing_zeros() as usize;
529                zeros as isize
530            };
531
532            if pos >= 0 {
533                let tmp = (clk + pos as u64) << lvl_shift(lvl);
534                if tmp < next {
535                    next = tmp
536                }
537
538                // If the next expiration happens before we reach
539                // the next level, no need to check further.
540                if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
541                    break;
542                }
543            }
544
545            clk >>= LVL_CLK_SHIFT;
546            clk += u64::from(lvl_clk != 0);
547        }
548
549        if next < u64::MAX { Some(next) } else { None }
550    }
551
552    /// Get next expiry time in millis
553    fn next_expiry_ms(&self) -> u64 {
554        to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
555    }
556
557    fn stop_wheel(&self) {
558        // mark all timers as elapsed
559        if let Some(mut inner) = self.inner.take() {
560            let mut buckets = mem::take(&mut inner.buckets);
561            for b in &mut buckets {
562                for no in b.entries.drain() {
563                    inner.timers[no].bucket = None;
564                }
565            }
566
567            // cleanup info
568            self.flags.set(Flags::empty());
569            self.next_expiry.set(u64::MAX);
570            self.elapsed.set(0);
571            self.elapsed_time.set(None);
572            self.lowres_time.set(None);
573            self.lowres_stime.set(None);
574
575            inner.buckets = buckets;
576            inner.occupied = [0; WHEEL_SIZE];
577            self.inner.set(Some(inner));
578        }
579    }
580}
581
582#[derive(Debug)]
583struct Bucket {
584    lvl: u32,
585    bit: u64,
586    bit_n: u64,
587    entries: Slab<usize>,
588}
589
590impl Bucket {
591    fn add_entry(&mut self, no: usize) -> usize {
592        self.entries.insert(no)
593    }
594}
595
596impl Bucket {
597    fn new(lvl: usize, offs: usize) -> Self {
598        let bit = 1 << (offs as u64);
599        Bucket {
600            bit,
601            lvl: lvl as u32,
602            bit_n: !bit,
603            entries: Slab::default(),
604        }
605    }
606}
607
608#[derive(Debug)]
609struct TimerEntry {
610    bucket: Option<u16>,
611    bucket_entry: usize,
612    task: LocalWaker,
613}
614
615impl TimerEntry {
616    fn complete(&mut self) {
617        if self.bucket.is_some() {
618            self.bucket.take();
619            self.task.wake();
620        }
621    }
622}
623
624struct TimerDriver(Rc<TimerInner>);
625
626impl TimerDriver {
627    fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
628        let mut flags = timer.flags.get();
629        flags.insert(Flags::DRIVER_STARTED);
630        timer.flags.set(flags);
631        inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms()));
632
633        let _ = crate::spawn(TimerDriver(timer));
634    }
635}
636
637impl Drop for TimerDriver {
638    fn drop(&mut self) {
639        self.0.stop_wheel();
640    }
641}
642
643impl Future for TimerDriver {
644    type Output = ();
645
646    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
647        self.0.driver.register(cx.waker());
648
649        self.0.with_mod(|inner| {
650            let mut flags = self.0.flags.get();
651            if flags.contains(Flags::DRIVER_RECALC) {
652                flags.remove(Flags::DRIVER_RECALC);
653                self.0.flags.set(flags);
654
655                let now = Instant::now();
656                let deadline =
657                    if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
658                        Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
659                    } else {
660                        Duration::from_millis(self.0.next_expiry_ms())
661                    };
662                inner.driver_sleep.reset(deadline);
663            }
664
665            loop {
666                if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
667                    let now = Instant::now();
668                    self.0.elapsed.set(self.0.next_expiry.get());
669                    self.0.elapsed_time.set(Some(now));
670                    self.0.execute_expired_timers(inner);
671
672                    if let Some(next_expiry) = self.0.next_pending_bucket(inner) {
673                        self.0.next_expiry.set(next_expiry);
674                        let dur = Duration::from_millis(self.0.next_expiry_ms());
675                        inner.driver_sleep.reset(dur);
676                        continue;
677                    } else {
678                        self.0.next_expiry.set(u64::MAX);
679                        self.0.elapsed_time.set(None);
680                    }
681                }
682                return Poll::Pending;
683            }
684        })
685    }
686}
687
688struct LowresTimerDriver(Rc<TimerInner>);
689
690impl LowresTimerDriver {
691    fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
692        let mut flags = timer.flags.get();
693        flags.insert(Flags::LOWRES_DRIVER);
694        timer.flags.set(flags);
695        inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
696
697        let _ = crate::spawn(LowresTimerDriver(timer));
698    }
699}
700
701impl Drop for LowresTimerDriver {
702    fn drop(&mut self) {
703        self.0.stop_wheel();
704    }
705}
706
707impl Future for LowresTimerDriver {
708    type Output = ();
709
710    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
711        self.0.lowres_driver.register(cx.waker());
712
713        self.0.with_mod(|inner| {
714            let mut flags = self.0.flags.get();
715            if !flags.contains(Flags::LOWRES_TIMER) {
716                flags.insert(Flags::LOWRES_TIMER);
717                self.0.flags.set(flags);
718                inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
719            }
720
721            if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
722                self.0.lowres_time.set(None);
723                self.0.lowres_stime.set(None);
724                flags.remove(Flags::LOWRES_TIMER);
725                self.0.flags.set(flags);
726            }
727            Poll::Pending
728        })
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::time::{Millis, interval, sleep};
736
737    #[ntex_macros::rt_test2]
738    async fn test_timer() {
739        crate::spawn(async {
740            let s = interval(Millis(25));
741            loop {
742                s.tick().await;
743            }
744        });
745        let time = Instant::now();
746        let fut1 = sleep(Millis(1000));
747        let fut2 = sleep(Millis(200));
748
749        fut2.await;
750        let _elapsed = Instant::now() - time;
751        #[cfg(not(target_os = "macos"))]
752        assert!(
753            _elapsed > Duration::from_millis(200) && _elapsed < Duration::from_millis(300),
754            "elapsed: {_elapsed:?}"
755        );
756
757        fut1.await;
758        let _elapsed = Instant::now() - time;
759        #[cfg(not(target_os = "macos"))]
760        assert!(
761            _elapsed > Duration::from_millis(1000)
762                && _elapsed < Duration::from_millis(1200), // osx
763            "elapsed: {_elapsed:?}",
764        );
765
766        let time = Instant::now();
767        sleep(Millis(25)).await;
768        let _elapsed = Instant::now() - time;
769        #[cfg(not(target_os = "macos"))]
770        assert!(
771            _elapsed > Duration::from_millis(20) && _elapsed < Duration::from_millis(50),
772            "elapsed: {_elapsed:?}",
773        );
774    }
775}