1#![allow(arithmetic_overflow, clippy::let_underscore_future)]
5use std::time::{Duration, Instant, SystemTime};
6use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
7
8use futures_timer::Delay;
9use slab::Slab;
10
11use crate::task::LocalWaker;
12
13const LVL_CLK_SHIFT: u64 = 3;
15const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
16const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
17
18const fn lvl_shift(n: u64) -> u64 {
19 n * LVL_CLK_SHIFT
20}
21
22const fn lvl_gran(n: u64) -> u64 {
23 1 << lvl_shift(n)
24}
25
26const UNITS: u64 = 4;
30const fn to_units(n: u64) -> u64 {
33 n >> UNITS
34}
35
36const fn to_millis(n: u64) -> u64 {
37 n << UNITS
38}
39
40const fn lvl_start(lvl: u64) -> u64 {
42 (LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
43}
44
45const LVL_BITS: u64 = 6;
47const LVL_SIZE: u64 = 1 << LVL_BITS;
48const LVL_MASK: u64 = LVL_SIZE - 1;
49
50const LVL_DEPTH: u64 = 8;
52
53const fn lvl_offs(n: u64) -> u64 {
54 n * LVL_SIZE
55}
56
57const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
59const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
60const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
61
62const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
64
65const fn as_millis(dur: Duration) -> u64 {
66 dur.as_secs() * 1_000 + (dur.subsec_millis() as u64)
67}
68
69#[inline]
73pub fn now() -> Instant {
74 TIMER.with(|t| t.with_mod(|inner| t.now(inner)))
75}
76
77#[inline]
81pub fn system_time() -> SystemTime {
82 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
83}
84
85#[inline]
90pub fn query_system_time() -> SystemTime {
91 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
92}
93
94#[derive(Debug)]
95pub struct TimerHandle(usize);
96
97impl TimerHandle {
98 pub fn new(millis: u64) -> Self {
100 TIMER.with(|t| t.add_timer(millis))
101 }
102
103 pub fn reset(&self, millis: u64) {
105 TIMER.with(|t| t.update_timer(self.0, millis))
106 }
107
108 pub fn elapse(&self) {
110 TIMER.with(|t| t.remove_timer(self.0))
111 }
112
113 pub fn is_elapsed(&self) -> bool {
114 TIMER.with(|t| t.with_mod(|m| m.timers[self.0].bucket.is_none()))
115 }
116
117 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
118 TIMER.with(|t| {
119 t.with_mod(|inner| {
120 let entry = &inner.timers[self.0];
121 if entry.bucket.is_none() {
122 Poll::Ready(())
123 } else {
124 entry.task.register(cx.waker());
125 Poll::Pending
126 }
127 })
128 })
129 }
130}
131
132impl Drop for TimerHandle {
133 fn drop(&mut self) {
134 TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0, true)))
135 }
136}
137
138bitflags::bitflags! {
139 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
140 pub struct Flags: u8 {
141 const DRIVER_STARTED = 0b0000_0001;
142 const DRIVER_RECALC = 0b0000_0010;
143 const LOWRES_TIMER = 0b0000_1000;
144 const LOWRES_DRIVER = 0b0001_0000;
145 const RUNNING = 0b0010_0000;
146 }
147}
148
149thread_local! {
150 static TIMER: Timer = Timer::new();
151}
152
153struct Timer(Rc<TimerInner>);
154
155struct TimerInner {
156 elapsed: Cell<u64>,
157 elapsed_time: Cell<Option<Instant>>,
158 next_expiry: Cell<u64>,
159 flags: Cell<Flags>,
160 driver: LocalWaker,
161 lowres_time: Cell<Option<Instant>>,
162 lowres_stime: Cell<Option<SystemTime>>,
163 lowres_driver: LocalWaker,
164 inner: Cell<Option<Box<TimerMod>>>,
165}
166
167struct TimerMod {
168 timers: Slab<TimerEntry>,
169 driver_sleep: Delay,
170 buckets: Vec<Bucket>,
171 occupied: [u64; WHEEL_SIZE],
173 lowres_driver_sleep: Delay,
174}
175
176impl Timer {
177 fn new() -> Self {
178 Timer(Rc::new(TimerInner {
179 elapsed: Cell::new(0),
180 elapsed_time: Cell::new(None),
181 next_expiry: Cell::new(u64::MAX),
182 flags: Cell::new(Flags::empty()),
183 driver: LocalWaker::new(),
184 lowres_time: Cell::new(None),
185 lowres_stime: Cell::new(None),
186 lowres_driver: LocalWaker::new(),
187 inner: Cell::new(Some(Box::new(TimerMod {
188 buckets: Self::create_buckets(),
189 timers: Slab::default(),
190 driver_sleep: Delay::new(Duration::ZERO),
191 occupied: [0; WHEEL_SIZE],
192 lowres_driver_sleep: Delay::new(Duration::ZERO),
193 }))),
194 }))
195 }
196
197 fn with_mod<F, R>(&self, f: F) -> R
198 where
199 F: FnOnce(&mut TimerMod) -> R,
200 {
201 let mut m = self.0.inner.take().unwrap();
202 let result = f(&mut m);
203 self.0.inner.set(Some(m));
204 result
205 }
206
207 fn create_buckets() -> Vec<Bucket> {
208 let mut buckets = Vec::with_capacity(WHEEL_SIZE);
209 for idx in 0..WHEEL_SIZE {
210 let lvl = idx / (LVL_SIZE as usize);
211 let offs = idx % (LVL_SIZE as usize);
212 buckets.push(Bucket::new(lvl, offs))
213 }
214 buckets
215 }
216
217 fn now(&self, inner: &mut TimerMod) -> Instant {
218 if let Some(cur) = self.0.lowres_time.get() {
219 cur
220 } else {
221 let now = Instant::now();
222
223 let flags = self.0.flags.get();
224 if flags.contains(Flags::RUNNING) {
225 self.0.lowres_time.set(Some(now));
226
227 if flags.contains(Flags::LOWRES_DRIVER) {
228 self.0.lowres_driver.wake();
229 } else {
230 LowresTimerDriver::start(self.0.clone(), inner);
231 }
232 }
233 now
234 }
235 }
236
237 fn system_time(&self, inner: &mut TimerMod) -> SystemTime {
238 if let Some(cur) = self.0.lowres_stime.get() {
239 cur
240 } else {
241 let now = SystemTime::now();
242 let flags = self.0.flags.get();
243
244 if flags.contains(Flags::RUNNING) {
245 self.0.lowres_stime.set(Some(now));
246
247 if flags.contains(Flags::LOWRES_DRIVER) {
248 self.0.lowres_driver.wake();
249 } else {
250 LowresTimerDriver::start(self.0.clone(), inner);
251 }
252 }
253 now
254 }
255 }
256
257 fn add_timer(&self, millis: u64) -> TimerHandle {
259 self.with_mod(|inner| {
260 if millis == 0 {
261 let entry = inner.timers.vacant_entry();
262 let no = entry.key();
263
264 entry.insert(TimerEntry {
265 bucket_entry: 0,
266 bucket: None,
267 task: LocalWaker::new(),
268 });
269 return TimerHandle(no);
270 }
271
272 let mut flags = self.0.flags.get();
273 flags.insert(Flags::RUNNING);
274 self.0.flags.set(flags);
275
276 let now = self.now(inner);
277 let elapsed_time = self.0.elapsed_time();
278 let delta = if now >= elapsed_time {
279 to_units(as_millis(now - elapsed_time) + millis)
280 } else {
281 to_units(millis)
282 };
283
284 let (no, bucket_expiry) = {
285 let (idx, bucket_expiry) = self
287 .0
288 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
289
290 let no = inner.add_entry(idx);
291 (no, bucket_expiry)
292 };
293
294 if bucket_expiry < self.0.next_expiry.get() {
296 self.0.next_expiry.set(bucket_expiry);
297 if flags.contains(Flags::DRIVER_STARTED) {
298 flags.insert(Flags::DRIVER_RECALC);
299 self.0.flags.set(flags);
300 self.0.driver.wake();
301 } else {
302 TimerDriver::start(self.0.clone(), inner);
303 }
304 }
305
306 TimerHandle(no)
307 })
308 }
309
310 fn remove_timer(&self, hnd: usize) {
312 self.with_mod(|inner| {
313 inner.remove_timer_bucket(hnd, false);
314 inner.timers[hnd].complete();
315 })
316 }
317
318 fn update_timer(&self, hnd: usize, millis: u64) {
320 self.with_mod(|inner| {
321 if millis == 0 {
322 inner.remove_timer_bucket(hnd, false);
323 inner.timers[hnd].bucket = None;
324 return;
325 }
326
327 let now = self.now(inner);
328 let elapsed_time = self.0.elapsed_time();
329 let delta = if now >= elapsed_time {
330 max(to_units(as_millis(now - elapsed_time) + millis), 1)
331 } else {
332 max(to_units(millis), 1)
333 };
334
335 let bucket_expiry = {
336 let (idx, bucket_expiry) = self
338 .0
339 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
340
341 inner.update_entry(hnd, idx);
342
343 bucket_expiry
344 };
345
346 if bucket_expiry < self.0.next_expiry.get() {
348 self.0.next_expiry.set(bucket_expiry);
349 let mut flags = self.0.flags.get();
350 if flags.contains(Flags::DRIVER_STARTED) {
351 flags.insert(Flags::DRIVER_RECALC);
352 self.0.flags.set(flags);
353 self.0.driver.wake();
354 } else {
355 TimerDriver::start(self.0.clone(), inner);
356 }
357 }
358 })
359 }
360}
361
362impl TimerMod {
363 fn execute_expired_timers(&mut self, mut clk: u64) {
364 for lvl in 0..LVL_DEPTH {
365 let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
366 let b = &mut self.buckets[idx as usize];
367 if !b.entries.is_empty() {
368 self.occupied[b.lvl as usize] &= b.bit_n;
369 for no in b.entries.drain() {
370 if let Some(timer) = self.timers.get_mut(no) {
371 timer.complete();
372 }
373 }
374 }
375
376 if (clk & LVL_CLK_MASK) != 0 {
378 break;
379 }
380 clk >>= LVL_CLK_SHIFT;
382 }
383 }
384
385 fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
386 let entry = &mut self.timers[handle];
387 if let Some(bucket) = entry.bucket {
388 let b = &mut self.buckets[bucket as usize];
389 b.entries.remove(entry.bucket_entry);
390 if b.entries.is_empty() {
391 self.occupied[b.lvl as usize] &= b.bit_n;
392 }
393 }
394
395 if remove_handle {
396 self.timers.remove(handle);
397 }
398 }
399
400 fn add_entry(&mut self, idx: usize) -> usize {
401 let entry = self.timers.vacant_entry();
402 let no = entry.key();
403 let bucket = &mut self.buckets[idx];
404 let bucket_entry = bucket.add_entry(no);
405
406 entry.insert(TimerEntry {
407 bucket_entry,
408 bucket: Some(idx as u16),
409 task: LocalWaker::new(),
410 });
411 self.occupied[bucket.lvl as usize] |= bucket.bit;
412
413 no
414 }
415
416 fn update_entry(&mut self, hnd: usize, idx: usize) {
417 let entry = &mut self.timers[hnd];
418
419 if let Some(bucket) = entry.bucket {
421 if idx == bucket as usize {
423 return;
424 }
425
426 let b = &mut self.buckets[bucket as usize];
428 b.entries.remove(entry.bucket_entry);
429 if b.entries.is_empty() {
430 self.occupied[b.lvl as usize] &= b.bit_n;
431 }
432 }
433
434 let bucket = &mut self.buckets[idx];
436 entry.bucket = Some(idx as u16);
437 entry.bucket_entry = bucket.add_entry(hnd);
438
439 self.occupied[bucket.lvl as usize] |= bucket.bit;
440 }
441}
442
443impl TimerInner {
444 fn with_mod<F, R>(&self, f: F) -> R
445 where
446 F: FnOnce(&mut TimerMod) -> R,
447 {
448 let mut m = self.inner.take().unwrap();
449 let result = f(&mut m);
450 self.inner.set(Some(m));
451 result
452 }
453
454 fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
455 if delta < lvl_start(1) {
456 Self::calc_index(expires, 0)
457 } else if delta < lvl_start(2) {
458 Self::calc_index(expires, 1)
459 } else if delta < lvl_start(3) {
460 Self::calc_index(expires, 2)
461 } else if delta < lvl_start(4) {
462 Self::calc_index(expires, 3)
463 } else if delta < lvl_start(5) {
464 Self::calc_index(expires, 4)
465 } else if delta < lvl_start(6) {
466 Self::calc_index(expires, 5)
467 } else if delta < lvl_start(7) {
468 Self::calc_index(expires, 6)
469 } else if delta < lvl_start(8) {
470 Self::calc_index(expires, 7)
471 } else {
472 if delta >= WHEEL_TIMEOUT_CUTOFF {
475 Self::calc_index(
476 self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
477 LVL_DEPTH - 1,
478 )
479 } else {
480 Self::calc_index(expires, LVL_DEPTH - 1)
481 }
482 }
483 }
484
485 fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
487 let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
495 (
496 (lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
497 expires << lvl_shift(lvl),
498 )
499 }
500
501 fn elapsed_time(&self) -> Instant {
502 if let Some(elapsed_time) = self.elapsed_time.get() {
503 elapsed_time
504 } else {
505 let elapsed_time = Instant::now();
506 self.elapsed_time.set(Some(elapsed_time));
507 elapsed_time
508 }
509 }
510
511 fn execute_expired_timers(&self, inner: &mut TimerMod) {
512 inner.execute_expired_timers(self.next_expiry.get());
513 }
514
515 fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option<u64> {
517 let mut clk = self.elapsed.get();
518 let mut next = u64::MAX;
519
520 for lvl in 0..LVL_DEPTH {
521 let lvl_clk = clk & LVL_CLK_MASK;
522 let occupied = inner.occupied[lvl as usize];
523 let pos = if occupied == 0 {
524 -1
525 } else {
526 let zeros = occupied
527 .rotate_right((clk & LVL_MASK) as u32)
528 .trailing_zeros() as usize;
529 zeros as isize
530 };
531
532 if pos >= 0 {
533 let tmp = (clk + pos as u64) << lvl_shift(lvl);
534 if tmp < next {
535 next = tmp
536 }
537
538 if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
541 break;
542 }
543 }
544
545 clk >>= LVL_CLK_SHIFT;
546 clk += u64::from(lvl_clk != 0);
547 }
548
549 if next < u64::MAX { Some(next) } else { None }
550 }
551
552 fn next_expiry_ms(&self) -> u64 {
554 to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
555 }
556
557 fn stop_wheel(&self) {
558 if let Some(mut inner) = self.inner.take() {
560 let mut buckets = mem::take(&mut inner.buckets);
561 for b in &mut buckets {
562 for no in b.entries.drain() {
563 inner.timers[no].bucket = None;
564 }
565 }
566
567 self.flags.set(Flags::empty());
569 self.next_expiry.set(u64::MAX);
570 self.elapsed.set(0);
571 self.elapsed_time.set(None);
572 self.lowres_time.set(None);
573 self.lowres_stime.set(None);
574
575 inner.buckets = buckets;
576 inner.occupied = [0; WHEEL_SIZE];
577 self.inner.set(Some(inner));
578 }
579 }
580}
581
582#[derive(Debug)]
583struct Bucket {
584 lvl: u32,
585 bit: u64,
586 bit_n: u64,
587 entries: Slab<usize>,
588}
589
590impl Bucket {
591 fn add_entry(&mut self, no: usize) -> usize {
592 self.entries.insert(no)
593 }
594}
595
596impl Bucket {
597 fn new(lvl: usize, offs: usize) -> Self {
598 let bit = 1 << (offs as u64);
599 Bucket {
600 bit,
601 lvl: lvl as u32,
602 bit_n: !bit,
603 entries: Slab::default(),
604 }
605 }
606}
607
608#[derive(Debug)]
609struct TimerEntry {
610 bucket: Option<u16>,
611 bucket_entry: usize,
612 task: LocalWaker,
613}
614
615impl TimerEntry {
616 fn complete(&mut self) {
617 if self.bucket.is_some() {
618 self.bucket.take();
619 self.task.wake();
620 }
621 }
622}
623
624struct TimerDriver(Rc<TimerInner>);
625
626impl TimerDriver {
627 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
628 let mut flags = timer.flags.get();
629 flags.insert(Flags::DRIVER_STARTED);
630 timer.flags.set(flags);
631 inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms()));
632
633 let _ = crate::spawn(TimerDriver(timer));
634 }
635}
636
637impl Drop for TimerDriver {
638 fn drop(&mut self) {
639 self.0.stop_wheel();
640 }
641}
642
643impl Future for TimerDriver {
644 type Output = ();
645
646 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
647 self.0.driver.register(cx.waker());
648
649 self.0.with_mod(|inner| {
650 let mut flags = self.0.flags.get();
651 if flags.contains(Flags::DRIVER_RECALC) {
652 flags.remove(Flags::DRIVER_RECALC);
653 self.0.flags.set(flags);
654
655 let now = Instant::now();
656 let deadline =
657 if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
658 Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
659 } else {
660 Duration::from_millis(self.0.next_expiry_ms())
661 };
662 inner.driver_sleep.reset(deadline);
663 }
664
665 loop {
666 if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
667 let now = Instant::now();
668 self.0.elapsed.set(self.0.next_expiry.get());
669 self.0.elapsed_time.set(Some(now));
670 self.0.execute_expired_timers(inner);
671
672 if let Some(next_expiry) = self.0.next_pending_bucket(inner) {
673 self.0.next_expiry.set(next_expiry);
674 let dur = Duration::from_millis(self.0.next_expiry_ms());
675 inner.driver_sleep.reset(dur);
676 continue;
677 } else {
678 self.0.next_expiry.set(u64::MAX);
679 self.0.elapsed_time.set(None);
680 }
681 }
682 return Poll::Pending;
683 }
684 })
685 }
686}
687
688struct LowresTimerDriver(Rc<TimerInner>);
689
690impl LowresTimerDriver {
691 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
692 let mut flags = timer.flags.get();
693 flags.insert(Flags::LOWRES_DRIVER);
694 timer.flags.set(flags);
695 inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
696
697 let _ = crate::spawn(LowresTimerDriver(timer));
698 }
699}
700
701impl Drop for LowresTimerDriver {
702 fn drop(&mut self) {
703 self.0.stop_wheel();
704 }
705}
706
707impl Future for LowresTimerDriver {
708 type Output = ();
709
710 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
711 self.0.lowres_driver.register(cx.waker());
712
713 self.0.with_mod(|inner| {
714 let mut flags = self.0.flags.get();
715 if !flags.contains(Flags::LOWRES_TIMER) {
716 flags.insert(Flags::LOWRES_TIMER);
717 self.0.flags.set(flags);
718 inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
719 }
720
721 if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
722 self.0.lowres_time.set(None);
723 self.0.lowres_stime.set(None);
724 flags.remove(Flags::LOWRES_TIMER);
725 self.0.flags.set(flags);
726 }
727 Poll::Pending
728 })
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use crate::time::{Millis, interval, sleep};
736
737 #[ntex_macros::rt_test2]
738 async fn test_timer() {
739 crate::spawn(async {
740 let s = interval(Millis(25));
741 loop {
742 s.tick().await;
743 }
744 });
745 let time = Instant::now();
746 let fut1 = sleep(Millis(1000));
747 let fut2 = sleep(Millis(200));
748
749 fut2.await;
750 let _elapsed = Instant::now() - time;
751 #[cfg(not(target_os = "macos"))]
752 assert!(
753 _elapsed > Duration::from_millis(200) && _elapsed < Duration::from_millis(300),
754 "elapsed: {_elapsed:?}"
755 );
756
757 fut1.await;
758 let _elapsed = Instant::now() - time;
759 #[cfg(not(target_os = "macos"))]
760 assert!(
761 _elapsed > Duration::from_millis(1000)
762 && _elapsed < Duration::from_millis(1200), "elapsed: {_elapsed:?}",
764 );
765
766 let time = Instant::now();
767 sleep(Millis(25)).await;
768 let _elapsed = Instant::now() - time;
769 #[cfg(not(target_os = "macos"))]
770 assert!(
771 _elapsed > Duration::from_millis(20) && _elapsed < Duration::from_millis(50),
772 "elapsed: {_elapsed:?}",
773 );
774 }
775}