ntex_util/time/
wheel.rs

1//! Time wheel based timer service.
2//!
3//! Inspired by linux kernel timers system
4#![allow(arithmetic_overflow, clippy::let_underscore_future)]
5use std::time::{Duration, Instant, SystemTime};
6use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
7
8use futures_timer::Delay;
9use slab::Slab;
10
11use crate::task::LocalWaker;
12
13// Clock divisor for the next level
14const LVL_CLK_SHIFT: u64 = 3;
15const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
16const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
17
18const fn lvl_shift(n: u64) -> u64 {
19    n * LVL_CLK_SHIFT
20}
21
22const fn lvl_gran(n: u64) -> u64 {
23    1 << lvl_shift(n)
24}
25
26// Resolution:
27// 0: 1 millis
28// 4: ~17 millis
29const UNITS: u64 = 4;
30// const UNITS: u64 = 0;
31
32const fn to_units(n: u64) -> u64 {
33    n >> UNITS
34}
35
36const fn to_millis(n: u64) -> u64 {
37    n << UNITS
38}
39
40// The time start value for each level to select the bucket at enqueue time
41const fn lvl_start(lvl: u64) -> u64 {
42    (LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
43}
44
45// Size of each clock level
46const LVL_BITS: u64 = 6;
47const LVL_SIZE: u64 = 1 << LVL_BITS;
48const LVL_MASK: u64 = LVL_SIZE - 1;
49
50// Level depth
51const LVL_DEPTH: u64 = 8;
52
53const fn lvl_offs(n: u64) -> u64 {
54    n * LVL_SIZE
55}
56
57// The cutoff (max. capacity of the wheel)
58const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
59const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
60const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
61
62// Low res time resolution
63const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
64
65const fn as_millis(dur: Duration) -> u64 {
66    dur.as_secs() * 1_000 + (dur.subsec_millis() as u64)
67}
68
69/// Returns an instant corresponding to “now”.
70///
71/// Resolution is 5ms
72#[inline]
73pub fn now() -> Instant {
74    TIMER.with(|t| t.with_mod(|inner| t.now(inner)))
75}
76
77/// Returns the system time corresponding to “now”.
78///
79/// Resolution is 5ms
80#[inline]
81pub fn system_time() -> SystemTime {
82    TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
83}
84
85/// Returns the system time corresponding to “now”.
86///
87/// If low resolution system time is not set, use system time.
88/// This method does not start timer driver.
89#[inline]
90#[doc(hidden)]
91pub fn query_system_time() -> SystemTime {
92    TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
93}
94
95#[derive(Debug)]
96pub struct TimerHandle(usize);
97
98impl TimerHandle {
99    /// Createt new timer and return handle
100    pub fn new(millis: u64) -> Self {
101        TIMER.with(|t| t.add_timer(millis))
102    }
103
104    /// Resets the `TimerHandle` instance to a new deadline.
105    pub fn reset(&self, millis: u64) {
106        TIMER.with(|t| t.update_timer(self.0, millis))
107    }
108
109    /// Resets the `TimerHandle` instance to elapsed state.
110    pub fn elapse(&self) {
111        TIMER.with(|t| t.remove_timer(self.0))
112    }
113
114    pub fn is_elapsed(&self) -> bool {
115        TIMER.with(|t| t.with_mod(|m| m.timers[self.0].bucket.is_none()))
116    }
117
118    pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
119        TIMER.with(|t| {
120            t.with_mod(|inner| {
121                let entry = &inner.timers[self.0];
122                if entry.bucket.is_none() {
123                    Poll::Ready(())
124                } else {
125                    entry.task.register(cx.waker());
126                    Poll::Pending
127                }
128            })
129        })
130    }
131}
132
133impl Drop for TimerHandle {
134    fn drop(&mut self) {
135        TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0, true)))
136    }
137}
138
139bitflags::bitflags! {
140    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
141    pub struct Flags: u8 {
142        const DRIVER_STARTED = 0b0000_0001;
143        const DRIVER_RECALC  = 0b0000_0010;
144        const LOWRES_TIMER   = 0b0000_1000;
145        const LOWRES_DRIVER  = 0b0001_0000;
146        const RUNNING        = 0b0010_0000;
147    }
148}
149
150thread_local! {
151    static TIMER: Timer = Timer::new();
152}
153
154struct Timer(Rc<TimerInner>);
155
156struct TimerInner {
157    elapsed: Cell<u64>,
158    elapsed_time: Cell<Option<Instant>>,
159    next_expiry: Cell<u64>,
160    flags: Cell<Flags>,
161    driver: LocalWaker,
162    lowres_time: Cell<Option<Instant>>,
163    lowres_stime: Cell<Option<SystemTime>>,
164    lowres_driver: LocalWaker,
165    inner: Cell<Option<Box<TimerMod>>>,
166}
167
168struct TimerMod {
169    timers: Slab<TimerEntry>,
170    driver_sleep: Delay,
171    buckets: Vec<Bucket>,
172    /// Bit field tracking which bucket currently contain entries.
173    occupied: [u64; WHEEL_SIZE],
174    lowres_driver_sleep: Delay,
175}
176
177impl Timer {
178    fn new() -> Self {
179        Timer(Rc::new(TimerInner {
180            elapsed: Cell::new(0),
181            elapsed_time: Cell::new(None),
182            next_expiry: Cell::new(u64::MAX),
183            flags: Cell::new(Flags::empty()),
184            driver: LocalWaker::new(),
185            lowres_time: Cell::new(None),
186            lowres_stime: Cell::new(None),
187            lowres_driver: LocalWaker::new(),
188            inner: Cell::new(Some(Box::new(TimerMod {
189                buckets: Self::create_buckets(),
190                timers: Slab::default(),
191                driver_sleep: Delay::new(Duration::ZERO),
192                occupied: [0; WHEEL_SIZE],
193                lowres_driver_sleep: Delay::new(Duration::ZERO),
194            }))),
195        }))
196    }
197
198    fn with_mod<F, R>(&self, f: F) -> R
199    where
200        F: FnOnce(&mut TimerMod) -> R,
201    {
202        let mut m = self.0.inner.take().unwrap();
203        let result = f(&mut m);
204        self.0.inner.set(Some(m));
205        result
206    }
207
208    fn create_buckets() -> Vec<Bucket> {
209        let mut buckets = Vec::with_capacity(WHEEL_SIZE);
210        for idx in 0..WHEEL_SIZE {
211            let lvl = idx / (LVL_SIZE as usize);
212            let offs = idx % (LVL_SIZE as usize);
213            buckets.push(Bucket::new(lvl, offs))
214        }
215        buckets
216    }
217
218    fn now(&self, inner: &mut TimerMod) -> Instant {
219        if let Some(cur) = self.0.lowres_time.get() {
220            cur
221        } else {
222            let now = Instant::now();
223
224            let flags = self.0.flags.get();
225            if flags.contains(Flags::RUNNING) {
226                self.0.lowres_time.set(Some(now));
227
228                if flags.contains(Flags::LOWRES_DRIVER) {
229                    self.0.lowres_driver.wake();
230                } else {
231                    LowresTimerDriver::start(self.0.clone(), inner);
232                }
233            }
234            now
235        }
236    }
237
238    fn system_time(&self, inner: &mut TimerMod) -> SystemTime {
239        if let Some(cur) = self.0.lowres_stime.get() {
240            cur
241        } else {
242            let now = SystemTime::now();
243            let flags = self.0.flags.get();
244
245            if flags.contains(Flags::RUNNING) {
246                self.0.lowres_stime.set(Some(now));
247
248                if flags.contains(Flags::LOWRES_DRIVER) {
249                    self.0.lowres_driver.wake();
250                } else {
251                    LowresTimerDriver::start(self.0.clone(), inner);
252                }
253            }
254            now
255        }
256    }
257
258    /// Add the timer into the hash bucket
259    fn add_timer(&self, millis: u64) -> TimerHandle {
260        self.with_mod(|inner| {
261            if millis == 0 {
262                let entry = inner.timers.vacant_entry();
263                let no = entry.key();
264
265                entry.insert(TimerEntry {
266                    bucket_entry: 0,
267                    bucket: None,
268                    task: LocalWaker::new(),
269                });
270                return TimerHandle(no);
271            }
272
273            let mut flags = self.0.flags.get();
274            flags.insert(Flags::RUNNING);
275            self.0.flags.set(flags);
276
277            let now = self.now(inner);
278            let elapsed_time = self.0.elapsed_time();
279            let delta = if now >= elapsed_time {
280                to_units(as_millis(now - elapsed_time) + millis)
281            } else {
282                to_units(millis)
283            };
284
285            let (no, bucket_expiry) = {
286                // crate timer entry
287                let (idx, bucket_expiry) = self
288                    .0
289                    .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
290
291                let no = inner.add_entry(idx);
292                (no, bucket_expiry)
293            };
294
295            // Check whether new bucket expire earlier
296            if bucket_expiry < self.0.next_expiry.get() {
297                self.0.next_expiry.set(bucket_expiry);
298                if flags.contains(Flags::DRIVER_STARTED) {
299                    flags.insert(Flags::DRIVER_RECALC);
300                    self.0.flags.set(flags);
301                    self.0.driver.wake();
302                } else {
303                    TimerDriver::start(self.0.clone(), inner);
304                }
305            }
306
307            TimerHandle(no)
308        })
309    }
310
311    /// Remove timer and wake task
312    fn remove_timer(&self, hnd: usize) {
313        self.with_mod(|inner| {
314            inner.remove_timer_bucket(hnd, false);
315            inner.timers[hnd].complete();
316        })
317    }
318
319    /// Update existing timer
320    fn update_timer(&self, hnd: usize, millis: u64) {
321        self.with_mod(|inner| {
322            if millis == 0 {
323                inner.remove_timer_bucket(hnd, false);
324                inner.timers[hnd].bucket = None;
325                return;
326            }
327
328            let now = self.now(inner);
329            let elapsed_time = self.0.elapsed_time();
330            let delta = if now >= elapsed_time {
331                max(to_units(as_millis(now - elapsed_time) + millis), 1)
332            } else {
333                max(to_units(millis), 1)
334            };
335
336            let bucket_expiry = {
337                // calc bucket
338                let (idx, bucket_expiry) = self
339                    .0
340                    .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
341
342                inner.update_entry(hnd, idx);
343
344                bucket_expiry
345            };
346
347            // Check whether new bucket expire earlier
348            if bucket_expiry < self.0.next_expiry.get() {
349                self.0.next_expiry.set(bucket_expiry);
350                let mut flags = self.0.flags.get();
351                if flags.contains(Flags::DRIVER_STARTED) {
352                    flags.insert(Flags::DRIVER_RECALC);
353                    self.0.flags.set(flags);
354                    self.0.driver.wake();
355                } else {
356                    TimerDriver::start(self.0.clone(), inner);
357                }
358            }
359        })
360    }
361}
362
363impl TimerMod {
364    fn execute_expired_timers(&mut self, mut clk: u64) {
365        for lvl in 0..LVL_DEPTH {
366            let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
367            let b = &mut self.buckets[idx as usize];
368            if !b.entries.is_empty() {
369                self.occupied[b.lvl as usize] &= b.bit_n;
370                for no in b.entries.drain() {
371                    if let Some(timer) = self.timers.get_mut(no) {
372                        timer.complete();
373                    }
374                }
375            }
376
377            // Is it time to look at the next level?
378            if (clk & LVL_CLK_MASK) != 0 {
379                break;
380            }
381            // Shift clock for the next level granularity
382            clk >>= LVL_CLK_SHIFT;
383        }
384    }
385
386    fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
387        let entry = &mut self.timers[handle];
388        if let Some(bucket) = entry.bucket {
389            let b = &mut self.buckets[bucket as usize];
390            b.entries.remove(entry.bucket_entry);
391            if b.entries.is_empty() {
392                self.occupied[b.lvl as usize] &= b.bit_n;
393            }
394        }
395
396        if remove_handle {
397            self.timers.remove(handle);
398        }
399    }
400
401    fn add_entry(&mut self, idx: usize) -> usize {
402        let entry = self.timers.vacant_entry();
403        let no = entry.key();
404        let bucket = &mut self.buckets[idx];
405        let bucket_entry = bucket.add_entry(no);
406
407        entry.insert(TimerEntry {
408            bucket_entry,
409            bucket: Some(idx as u16),
410            task: LocalWaker::new(),
411        });
412        self.occupied[bucket.lvl as usize] |= bucket.bit;
413
414        no
415    }
416
417    fn update_entry(&mut self, hnd: usize, idx: usize) {
418        let entry = &mut self.timers[hnd];
419
420        // cleanup active timer
421        if let Some(bucket) = entry.bucket {
422            // do not do anything if wheel bucket is the same
423            if idx == bucket as usize {
424                return;
425            }
426
427            // remove timer entry from current bucket
428            let b = &mut self.buckets[bucket as usize];
429            b.entries.remove(entry.bucket_entry);
430            if b.entries.is_empty() {
431                self.occupied[b.lvl as usize] &= b.bit_n;
432            }
433        }
434
435        // put timer to new bucket
436        let bucket = &mut self.buckets[idx];
437        entry.bucket = Some(idx as u16);
438        entry.bucket_entry = bucket.add_entry(hnd);
439
440        self.occupied[bucket.lvl as usize] |= bucket.bit;
441    }
442}
443
444impl TimerInner {
445    fn with_mod<F, R>(&self, f: F) -> R
446    where
447        F: FnOnce(&mut TimerMod) -> R,
448    {
449        let mut m = self.inner.take().unwrap();
450        let result = f(&mut m);
451        self.inner.set(Some(m));
452        result
453    }
454
455    fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
456        if delta < lvl_start(1) {
457            Self::calc_index(expires, 0)
458        } else if delta < lvl_start(2) {
459            Self::calc_index(expires, 1)
460        } else if delta < lvl_start(3) {
461            Self::calc_index(expires, 2)
462        } else if delta < lvl_start(4) {
463            Self::calc_index(expires, 3)
464        } else if delta < lvl_start(5) {
465            Self::calc_index(expires, 4)
466        } else if delta < lvl_start(6) {
467            Self::calc_index(expires, 5)
468        } else if delta < lvl_start(7) {
469            Self::calc_index(expires, 6)
470        } else if delta < lvl_start(8) {
471            Self::calc_index(expires, 7)
472        } else {
473            // Force expire obscene large timeouts to expire at the
474            // capacity limit of the wheel.
475            if delta >= WHEEL_TIMEOUT_CUTOFF {
476                Self::calc_index(
477                    self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
478                    LVL_DEPTH - 1,
479                )
480            } else {
481                Self::calc_index(expires, LVL_DEPTH - 1)
482            }
483        }
484    }
485
486    /// Helper function to calculate the bucket index and bucket expiration
487    fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
488        // The timer wheel has to guarantee that a timer does not fire
489        // early. Early expiry can happen due to:
490        // - Timer is armed at the edge of a tick
491        // - Truncation of the expiry time in the outer wheel levels
492        //
493        // Round up with level granularity to prevent this.
494
495        let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
496        (
497            (lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
498            expires << lvl_shift(lvl),
499        )
500    }
501
502    fn elapsed_time(&self) -> Instant {
503        if let Some(elapsed_time) = self.elapsed_time.get() {
504            elapsed_time
505        } else {
506            let elapsed_time = Instant::now();
507            self.elapsed_time.set(Some(elapsed_time));
508            elapsed_time
509        }
510    }
511
512    fn execute_expired_timers(&self, inner: &mut TimerMod) {
513        inner.execute_expired_timers(self.next_expiry.get());
514    }
515
516    /// Find next expiration bucket
517    fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option<u64> {
518        let mut clk = self.elapsed.get();
519        let mut next = u64::MAX;
520
521        for lvl in 0..LVL_DEPTH {
522            let lvl_clk = clk & LVL_CLK_MASK;
523            let occupied = inner.occupied[lvl as usize];
524            let pos = if occupied == 0 {
525                -1
526            } else {
527                let zeros = occupied
528                    .rotate_right((clk & LVL_MASK) as u32)
529                    .trailing_zeros() as usize;
530                zeros as isize
531            };
532
533            if pos >= 0 {
534                let tmp = (clk + pos as u64) << lvl_shift(lvl);
535                if tmp < next {
536                    next = tmp
537                }
538
539                // If the next expiration happens before we reach
540                // the next level, no need to check further.
541                if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
542                    break;
543                }
544            }
545
546            clk >>= LVL_CLK_SHIFT;
547            clk += u64::from(lvl_clk != 0);
548        }
549
550        if next < u64::MAX {
551            Some(next)
552        } else {
553            None
554        }
555    }
556
557    /// Get next expiry time in millis
558    fn next_expiry_ms(&self) -> u64 {
559        to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
560    }
561
562    fn stop_wheel(&self) {
563        // mark all timers as elapsed
564        if let Some(mut inner) = self.inner.take() {
565            let mut buckets = mem::take(&mut inner.buckets);
566            for b in &mut buckets {
567                for no in b.entries.drain() {
568                    inner.timers[no].bucket = None;
569                }
570            }
571
572            // cleanup info
573            self.flags.set(Flags::empty());
574            self.next_expiry.set(u64::MAX);
575            self.elapsed.set(0);
576            self.elapsed_time.set(None);
577            self.lowres_time.set(None);
578            self.lowres_stime.set(None);
579
580            inner.buckets = buckets;
581            inner.occupied = [0; WHEEL_SIZE];
582            self.inner.set(Some(inner));
583        }
584    }
585}
586
587#[derive(Debug)]
588struct Bucket {
589    lvl: u32,
590    bit: u64,
591    bit_n: u64,
592    entries: Slab<usize>,
593}
594
595impl Bucket {
596    fn add_entry(&mut self, no: usize) -> usize {
597        self.entries.insert(no)
598    }
599}
600
601impl Bucket {
602    fn new(lvl: usize, offs: usize) -> Self {
603        let bit = 1 << (offs as u64);
604        Bucket {
605            bit,
606            lvl: lvl as u32,
607            bit_n: !bit,
608            entries: Slab::default(),
609        }
610    }
611}
612
613#[derive(Debug)]
614struct TimerEntry {
615    bucket: Option<u16>,
616    bucket_entry: usize,
617    task: LocalWaker,
618}
619
620impl TimerEntry {
621    fn complete(&mut self) {
622        if self.bucket.is_some() {
623            self.bucket.take();
624            self.task.wake();
625        }
626    }
627}
628
629struct TimerDriver(Rc<TimerInner>);
630
631impl TimerDriver {
632    fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
633        let mut flags = timer.flags.get();
634        flags.insert(Flags::DRIVER_STARTED);
635        timer.flags.set(flags);
636        inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms()));
637
638        let _ = crate::spawn(TimerDriver(timer));
639    }
640}
641
642impl Drop for TimerDriver {
643    fn drop(&mut self) {
644        self.0.stop_wheel();
645    }
646}
647
648impl Future for TimerDriver {
649    type Output = ();
650
651    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
652        self.0.driver.register(cx.waker());
653
654        self.0.with_mod(|inner| {
655            let mut flags = self.0.flags.get();
656            if flags.contains(Flags::DRIVER_RECALC) {
657                flags.remove(Flags::DRIVER_RECALC);
658                self.0.flags.set(flags);
659
660                let now = Instant::now();
661                let deadline =
662                    if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
663                        Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
664                    } else {
665                        Duration::from_millis(self.0.next_expiry_ms())
666                    };
667                inner.driver_sleep.reset(deadline);
668            }
669
670            loop {
671                if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
672                    let now = Instant::now();
673                    self.0.elapsed.set(self.0.next_expiry.get());
674                    self.0.elapsed_time.set(Some(now));
675                    self.0.execute_expired_timers(inner);
676
677                    if let Some(next_expiry) = self.0.next_pending_bucket(inner) {
678                        self.0.next_expiry.set(next_expiry);
679                        let dur = Duration::from_millis(self.0.next_expiry_ms());
680                        inner.driver_sleep.reset(dur);
681                        continue;
682                    } else {
683                        self.0.next_expiry.set(u64::MAX);
684                        self.0.elapsed_time.set(None);
685                    }
686                }
687                return Poll::Pending;
688            }
689        })
690    }
691}
692
693struct LowresTimerDriver(Rc<TimerInner>);
694
695impl LowresTimerDriver {
696    fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
697        let mut flags = timer.flags.get();
698        flags.insert(Flags::LOWRES_DRIVER);
699        timer.flags.set(flags);
700        inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
701
702        let _ = crate::spawn(LowresTimerDriver(timer));
703    }
704}
705
706impl Drop for LowresTimerDriver {
707    fn drop(&mut self) {
708        self.0.stop_wheel();
709    }
710}
711
712impl Future for LowresTimerDriver {
713    type Output = ();
714
715    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
716        self.0.lowres_driver.register(cx.waker());
717
718        self.0.with_mod(|inner| {
719            let mut flags = self.0.flags.get();
720            if !flags.contains(Flags::LOWRES_TIMER) {
721                flags.insert(Flags::LOWRES_TIMER);
722                self.0.flags.set(flags);
723                inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
724            }
725
726            if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
727                self.0.lowres_time.set(None);
728                self.0.lowres_stime.set(None);
729                flags.remove(Flags::LOWRES_TIMER);
730                self.0.flags.set(flags);
731            }
732            Poll::Pending
733        })
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740    use crate::time::{interval, sleep, Millis};
741
742    #[ntex_macros::rt_test2]
743    async fn test_timer() {
744        crate::spawn(async {
745            let s = interval(Millis(25));
746            loop {
747                s.tick().await;
748            }
749        });
750        let time = Instant::now();
751        let fut1 = sleep(Millis(1000));
752        let fut2 = sleep(Millis(200));
753
754        fut2.await;
755        let _elapsed = Instant::now() - time;
756        #[cfg(not(target_os = "macos"))]
757        assert!(
758            _elapsed > Duration::from_millis(200) && _elapsed < Duration::from_millis(300),
759            "elapsed: {_elapsed:?}"
760        );
761
762        fut1.await;
763        let _elapsed = Instant::now() - time;
764        #[cfg(not(target_os = "macos"))]
765        assert!(
766            _elapsed > Duration::from_millis(1000)
767                && _elapsed < Duration::from_millis(1200), // osx
768            "elapsed: {_elapsed:?}",
769        );
770
771        let time = Instant::now();
772        sleep(Millis(25)).await;
773        let _elapsed = Instant::now() - time;
774        #[cfg(not(target_os = "macos"))]
775        assert!(
776            _elapsed > Duration::from_millis(20) && _elapsed < Duration::from_millis(50),
777            "elapsed: {_elapsed:?}",
778        );
779    }
780}