1#![allow(arithmetic_overflow)]
5use std::num::NonZeroUsize;
6use std::time::{Duration, Instant, SystemTime};
7use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
8
9use futures_timer::Delay;
10use slab::Slab;
11
12use crate::task::LocalWaker;
13
14const LVL_CLK_SHIFT: u64 = 3;
16const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
17const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
18
19const fn lvl_shift(n: u64) -> u64 {
20 n * LVL_CLK_SHIFT
21}
22
23const fn lvl_gran(n: u64) -> u64 {
24 1 << lvl_shift(n)
25}
26
27const UNITS: u64 = 4;
31const fn to_units(n: u64) -> u64 {
34 n >> UNITS
35}
36
37const fn to_millis(n: u64) -> u64 {
38 n << UNITS
39}
40
41const fn lvl_start(lvl: u64) -> u64 {
43 (LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
44}
45
46const LVL_BITS: u64 = 6;
48const LVL_SIZE: u64 = 1 << LVL_BITS;
49const LVL_MASK: u64 = LVL_SIZE - 1;
50
51const LVL_DEPTH: u64 = 8;
53
54const fn lvl_offs(n: u64) -> u64 {
55 n * LVL_SIZE
56}
57
58const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
60const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
61const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
62
63const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
65
66const fn as_millis(dur: Duration) -> u64 {
67 dur.as_secs() * 1_000 + (dur.subsec_millis() as u64)
68}
69
70#[inline]
74pub fn now() -> Instant {
75 TIMER.with(|t| t.with_mod(|inner| t.now(inner)))
76}
77
78#[inline]
82pub fn system_time() -> SystemTime {
83 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
84}
85
86#[inline]
91pub fn query_system_time() -> SystemTime {
92 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
93}
94
95#[derive(Debug)]
96pub struct TimerHandle(NonZeroUsize);
97
98impl TimerHandle {
99 pub fn new(millis: u64) -> Self {
101 TIMER.with(|t| t.add_timer(millis))
102 }
103
104 pub fn reset(&self, millis: u64) {
106 TIMER.with(|t| t.update_timer(self.0.get(), millis));
107 }
108
109 pub fn elapse(&self) {
111 TIMER.with(|t| t.remove_timer(self.0.get()));
112 }
113
114 pub fn is_elapsed(&self) -> bool {
115 TIMER.with(|t| t.with_mod(|m| m.timers[self.0.get()].bucket.is_none()))
116 }
117
118 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
119 TIMER.with(|t| {
120 t.with_mod(|inner| {
121 let entry = &inner.timers[self.0.get()];
122 if entry.bucket.is_none() {
123 Poll::Ready(())
124 } else {
125 entry.task.register(cx.waker());
126 Poll::Pending
127 }
128 })
129 })
130 }
131}
132
133impl Drop for TimerHandle {
134 fn drop(&mut self) {
135 TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0.get(), true)));
136 }
137}
138
139bitflags::bitflags! {
140 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
141 pub struct Flags: u8 {
142 const DRIVER_STARTED = 0b0000_0001;
143 const DRIVER_RECALC = 0b0000_0010;
144 const LOWRES_TIMER = 0b0000_1000;
145 const LOWRES_DRIVER = 0b0001_0000;
146 const RUNNING = 0b0010_0000;
147 }
148}
149
150thread_local! {
151 static TIMER: Timer = Timer::new();
152}
153
154struct Timer(Rc<TimerInner>);
155
156struct TimerInner {
157 elapsed: Cell<u64>,
158 elapsed_time: Cell<Option<Instant>>,
159 next_expiry: Cell<u64>,
160 flags: Cell<Flags>,
161 driver: LocalWaker,
162 lowres_time: Cell<Option<Instant>>,
163 lowres_stime: Cell<Option<SystemTime>>,
164 lowres_driver: LocalWaker,
165 inner: Cell<Option<Box<TimerMod>>>,
166}
167
168struct TimerMod {
169 timers: Slab<TimerEntry>,
170 driver_sleep: Delay,
171 buckets: Vec<Bucket>,
172 occupied: [u64; WHEEL_SIZE],
174 lowres_driver_sleep: Delay,
175}
176
177impl Timer {
178 fn new() -> Self {
179 let mut timers = Slab::default();
180 timers.insert(TimerEntry {
182 bucket: None,
183 bucket_entry: 0,
184 task: LocalWaker::new(),
185 });
186
187 Timer(Rc::new(TimerInner {
188 elapsed: Cell::new(0),
189 elapsed_time: Cell::new(None),
190 next_expiry: Cell::new(u64::MAX),
191 flags: Cell::new(Flags::empty()),
192 driver: LocalWaker::new(),
193 lowres_time: Cell::new(None),
194 lowres_stime: Cell::new(None),
195 lowres_driver: LocalWaker::new(),
196 inner: Cell::new(Some(Box::new(TimerMod {
197 timers,
198 buckets: Self::create_buckets(),
199 driver_sleep: Delay::new(Duration::ZERO),
200 occupied: [0; WHEEL_SIZE],
201 lowres_driver_sleep: Delay::new(Duration::ZERO),
202 }))),
203 }))
204 }
205
206 fn with_mod<F, R>(&self, f: F) -> R
207 where
208 F: FnOnce(&mut TimerMod) -> R,
209 {
210 let mut m = self.0.inner.take().unwrap();
211 let result = f(&mut m);
212 self.0.inner.set(Some(m));
213 result
214 }
215
216 fn create_buckets() -> Vec<Bucket> {
217 let mut buckets = Vec::with_capacity(WHEEL_SIZE);
218 for idx in 0..WHEEL_SIZE {
219 let lvl = idx / (LVL_SIZE as usize);
220 let offs = idx % (LVL_SIZE as usize);
221 buckets.push(Bucket::new(lvl, offs));
222 }
223 buckets
224 }
225
226 fn now(&self, inner: &mut TimerMod) -> Instant {
227 if let Some(cur) = self.0.lowres_time.get() {
228 cur
229 } else {
230 let now = Instant::now();
231
232 let flags = self.0.flags.get();
233 if flags.contains(Flags::RUNNING) {
234 self.0.lowres_time.set(Some(now));
235
236 if flags.contains(Flags::LOWRES_DRIVER) {
237 self.0.lowres_driver.wake();
238 } else {
239 LowresTimerDriver::start(self.0.clone(), inner);
240 }
241 }
242 now
243 }
244 }
245
246 fn system_time(&self, inner: &mut TimerMod) -> SystemTime {
247 if let Some(cur) = self.0.lowres_stime.get() {
248 cur
249 } else {
250 let now = SystemTime::now();
251 let flags = self.0.flags.get();
252
253 if flags.contains(Flags::RUNNING) {
254 self.0.lowres_stime.set(Some(now));
255
256 if flags.contains(Flags::LOWRES_DRIVER) {
257 self.0.lowres_driver.wake();
258 } else {
259 LowresTimerDriver::start(self.0.clone(), inner);
260 }
261 }
262 now
263 }
264 }
265
266 fn add_timer(&self, millis: u64) -> TimerHandle {
268 self.with_mod(|inner| {
269 if millis == 0 {
270 let entry = inner.timers.vacant_entry();
271 let no = entry.key();
272
273 entry.insert(TimerEntry {
274 bucket_entry: 0,
275 bucket: None,
276 task: LocalWaker::new(),
277 });
278 return TimerHandle(unsafe { NonZeroUsize::new_unchecked(no) });
280 }
281
282 let mut flags = self.0.flags.get();
283 flags.insert(Flags::RUNNING);
284 self.0.flags.set(flags);
285
286 let now = self.now(inner);
287 let elapsed_time = self.0.elapsed_time();
288 let delta = if now >= elapsed_time {
289 to_units(as_millis(now - elapsed_time) + millis)
290 } else {
291 to_units(millis)
292 };
293
294 let (no, bucket_expiry) = {
295 let (idx, bucket_expiry) = self
297 .0
298 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
299
300 let no = inner.add_entry(idx);
301 (no, bucket_expiry)
302 };
303
304 if bucket_expiry < self.0.next_expiry.get() {
306 self.0.next_expiry.set(bucket_expiry);
307 if flags.contains(Flags::DRIVER_STARTED) {
308 flags.insert(Flags::DRIVER_RECALC);
309 self.0.flags.set(flags);
310 self.0.driver.wake();
311 } else {
312 TimerDriver::start(self.0.clone(), inner);
313 }
314 }
315
316 TimerHandle(unsafe { NonZeroUsize::new_unchecked(no) })
318 })
319 }
320
321 fn remove_timer(&self, hnd: usize) {
323 self.with_mod(|inner| {
324 inner.remove_timer_bucket(hnd, false);
325 inner.timers[hnd].complete();
326 });
327 }
328
329 fn update_timer(&self, hnd: usize, millis: u64) {
331 self.with_mod(|inner| {
332 if millis == 0 {
333 inner.remove_timer_bucket(hnd, false);
334 inner.timers[hnd].bucket = None;
335 return;
336 }
337
338 let now = self.now(inner);
339 let elapsed_time = self.0.elapsed_time();
340 let delta = if now >= elapsed_time {
341 max(to_units(as_millis(now - elapsed_time) + millis), 1)
342 } else {
343 max(to_units(millis), 1)
344 };
345
346 let bucket_expiry = {
347 let (idx, bucket_expiry) = self
349 .0
350 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
351
352 inner.update_entry(hnd, idx);
353
354 bucket_expiry
355 };
356
357 if bucket_expiry < self.0.next_expiry.get() {
359 self.0.next_expiry.set(bucket_expiry);
360 let mut flags = self.0.flags.get();
361 if flags.contains(Flags::DRIVER_STARTED) {
362 flags.insert(Flags::DRIVER_RECALC);
363 self.0.flags.set(flags);
364 self.0.driver.wake();
365 } else {
366 TimerDriver::start(self.0.clone(), inner);
367 }
368 }
369 });
370 }
371}
372
373impl TimerMod {
374 fn execute_expired_timers(&mut self, mut clk: u64) {
375 for lvl in 0..LVL_DEPTH {
376 let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
377 let b = &mut self.buckets[idx as usize];
378 if !b.entries.is_empty() {
379 self.occupied[b.lvl as usize] &= b.bit_n;
380 for no in b.entries.drain() {
381 if let Some(timer) = self.timers.get_mut(no) {
382 timer.complete();
383 }
384 }
385 }
386
387 if (clk & LVL_CLK_MASK) != 0 {
389 break;
390 }
391 clk >>= LVL_CLK_SHIFT;
393 }
394 }
395
396 fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
397 let entry = &mut self.timers[handle];
398 if let Some(bucket) = entry.bucket {
399 let b = &mut self.buckets[bucket as usize];
400 b.entries.remove(entry.bucket_entry);
401 if b.entries.is_empty() {
402 self.occupied[b.lvl as usize] &= b.bit_n;
403 }
404 }
405
406 if remove_handle {
407 self.timers.remove(handle);
408 }
409 }
410
411 fn add_entry(&mut self, idx: usize) -> usize {
412 let entry = self.timers.vacant_entry();
413 let no = entry.key();
414 let bucket = &mut self.buckets[idx];
415 let bucket_entry = bucket.add_entry(no);
416
417 entry.insert(TimerEntry {
418 bucket_entry,
419 bucket: Some(idx as u16),
420 task: LocalWaker::new(),
421 });
422 self.occupied[bucket.lvl as usize] |= bucket.bit;
423
424 no
425 }
426
427 fn update_entry(&mut self, hnd: usize, idx: usize) {
428 let entry = &mut self.timers[hnd];
429
430 if let Some(bucket) = entry.bucket {
432 if idx == bucket as usize {
434 return;
435 }
436
437 let b = &mut self.buckets[bucket as usize];
439 b.entries.remove(entry.bucket_entry);
440 if b.entries.is_empty() {
441 self.occupied[b.lvl as usize] &= b.bit_n;
442 }
443 }
444
445 let bucket = &mut self.buckets[idx];
447 entry.bucket = Some(idx as u16);
448 entry.bucket_entry = bucket.add_entry(hnd);
449
450 self.occupied[bucket.lvl as usize] |= bucket.bit;
451 }
452}
453
454impl TimerInner {
455 fn with_mod<F, R>(&self, f: F) -> R
456 where
457 F: FnOnce(&mut TimerMod) -> R,
458 {
459 let mut m = self.inner.take().unwrap();
460 let result = f(&mut m);
461 self.inner.set(Some(m));
462 result
463 }
464
465 fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
466 if delta < lvl_start(1) {
467 Self::calc_index(expires, 0)
468 } else if delta < lvl_start(2) {
469 Self::calc_index(expires, 1)
470 } else if delta < lvl_start(3) {
471 Self::calc_index(expires, 2)
472 } else if delta < lvl_start(4) {
473 Self::calc_index(expires, 3)
474 } else if delta < lvl_start(5) {
475 Self::calc_index(expires, 4)
476 } else if delta < lvl_start(6) {
477 Self::calc_index(expires, 5)
478 } else if delta < lvl_start(7) {
479 Self::calc_index(expires, 6)
480 } else if delta < lvl_start(8) {
481 Self::calc_index(expires, 7)
482 } else {
483 if delta >= WHEEL_TIMEOUT_CUTOFF {
486 Self::calc_index(
487 self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
488 LVL_DEPTH - 1,
489 )
490 } else {
491 Self::calc_index(expires, LVL_DEPTH - 1)
492 }
493 }
494 }
495
496 fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
498 let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
506 (
507 (lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
508 expires << lvl_shift(lvl),
509 )
510 }
511
512 fn elapsed_time(&self) -> Instant {
513 if let Some(elapsed_time) = self.elapsed_time.get() {
514 elapsed_time
515 } else {
516 let elapsed_time = Instant::now();
517 self.elapsed_time.set(Some(elapsed_time));
518 elapsed_time
519 }
520 }
521
522 fn execute_expired_timers(&self, inner: &mut TimerMod) {
523 inner.execute_expired_timers(self.next_expiry.get());
524 }
525
526 fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option<u64> {
528 let mut clk = self.elapsed.get();
529 let mut next = u64::MAX;
530
531 for lvl in 0..LVL_DEPTH {
532 let lvl_clk = clk & LVL_CLK_MASK;
533 let occupied = inner.occupied[lvl as usize];
534 let pos = if occupied == 0 {
535 -1
536 } else {
537 let zeros = occupied
538 .rotate_right((clk & LVL_MASK) as u32)
539 .trailing_zeros() as usize;
540 zeros as isize
541 };
542
543 if pos >= 0 {
544 let tmp = (clk + pos as u64) << lvl_shift(lvl);
545 if tmp < next {
546 next = tmp;
547 }
548
549 if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
552 break;
553 }
554 }
555
556 clk >>= LVL_CLK_SHIFT;
557 clk += u64::from(lvl_clk != 0);
558 }
559
560 if next < u64::MAX { Some(next) } else { None }
561 }
562
563 fn next_expiry_ms(&self) -> u64 {
565 to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
566 }
567
568 fn stop_wheel(&self) {
569 if let Some(mut inner) = self.inner.take() {
571 let mut buckets = mem::take(&mut inner.buckets);
572 for b in &mut buckets {
573 for no in b.entries.drain() {
574 inner.timers[no].bucket = None;
575 }
576 }
577
578 self.flags.set(Flags::empty());
580 self.next_expiry.set(u64::MAX);
581 self.elapsed.set(0);
582 self.elapsed_time.set(None);
583 self.lowres_time.set(None);
584 self.lowres_stime.set(None);
585
586 inner.buckets = buckets;
587 inner.occupied = [0; WHEEL_SIZE];
588 self.inner.set(Some(inner));
589 }
590 }
591}
592
593#[derive(Debug)]
594struct Bucket {
595 lvl: u32,
596 bit: u64,
597 bit_n: u64,
598 entries: Slab<usize>,
599}
600
601impl Bucket {
602 fn add_entry(&mut self, no: usize) -> usize {
603 self.entries.insert(no)
604 }
605}
606
607impl Bucket {
608 fn new(lvl: usize, offs: usize) -> Self {
609 let bit = 1 << (offs as u64);
610 Bucket {
611 bit,
612 lvl: lvl as u32,
613 bit_n: !bit,
614 entries: Slab::default(),
615 }
616 }
617}
618
619#[derive(Debug)]
620struct TimerEntry {
621 bucket: Option<u16>,
622 bucket_entry: usize,
623 task: LocalWaker,
624}
625
626impl TimerEntry {
627 fn complete(&mut self) {
628 if self.bucket.is_some() {
629 self.bucket.take();
630 self.task.wake();
631 }
632 }
633}
634
635struct TimerDriver(Rc<TimerInner>);
636
637impl TimerDriver {
638 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
639 let mut flags = timer.flags.get();
640 flags.insert(Flags::DRIVER_STARTED);
641 timer.flags.set(flags);
642 inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms()));
643
644 crate::spawn(TimerDriver(timer));
645 }
646}
647
648impl Drop for TimerDriver {
649 fn drop(&mut self) {
650 self.0.stop_wheel();
651 }
652}
653
654impl Future for TimerDriver {
655 type Output = ();
656
657 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
658 self.0.driver.register(cx.waker());
659
660 self.0.with_mod(|inner| {
661 let mut flags = self.0.flags.get();
662 if flags.contains(Flags::DRIVER_RECALC) {
663 flags.remove(Flags::DRIVER_RECALC);
664 self.0.flags.set(flags);
665
666 let now = Instant::now();
667 let deadline =
668 if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
669 Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
670 } else {
671 Duration::from_millis(self.0.next_expiry_ms())
672 };
673 inner.driver_sleep.reset(deadline);
674 }
675
676 loop {
677 if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
678 let now = Instant::now();
679 self.0.elapsed.set(self.0.next_expiry.get());
680 self.0.elapsed_time.set(Some(now));
681 self.0.execute_expired_timers(inner);
682
683 if let Some(next_expiry) = self.0.next_pending_bucket(inner) {
684 self.0.next_expiry.set(next_expiry);
685 let dur = Duration::from_millis(self.0.next_expiry_ms());
686 inner.driver_sleep.reset(dur);
687 continue;
688 }
689 self.0.next_expiry.set(u64::MAX);
690 self.0.elapsed_time.set(None);
691 }
692 return Poll::Pending;
693 }
694 })
695 }
696}
697
698struct LowresTimerDriver(Rc<TimerInner>);
699
700impl LowresTimerDriver {
701 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
702 let mut flags = timer.flags.get();
703 flags.insert(Flags::LOWRES_DRIVER);
704 timer.flags.set(flags);
705 inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
706
707 crate::spawn(LowresTimerDriver(timer));
708 }
709}
710
711impl Drop for LowresTimerDriver {
712 fn drop(&mut self) {
713 self.0.stop_wheel();
714 }
715}
716
717impl Future for LowresTimerDriver {
718 type Output = ();
719
720 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
721 self.0.lowres_driver.register(cx.waker());
722
723 self.0.with_mod(|inner| {
724 let mut flags = self.0.flags.get();
725 if !flags.contains(Flags::LOWRES_TIMER) {
726 flags.insert(Flags::LOWRES_TIMER);
727 self.0.flags.set(flags);
728 inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
729 }
730
731 if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
732 self.0.lowres_time.set(None);
733 self.0.lowres_stime.set(None);
734 flags.remove(Flags::LOWRES_TIMER);
735 self.0.flags.set(flags);
736 }
737 Poll::Pending
738 })
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use super::*;
745 use crate::time::{Millis, interval, sleep};
746
747 #[ntex::test]
748 #[allow(unused_variables, clippy::used_underscore_binding)]
749 async fn test_timer() {
750 crate::spawn(async {
751 let s = interval(Millis(25));
752 loop {
753 s.tick().await;
754 }
755 });
756 let time = Instant::now();
757 let fut1 = sleep(Millis(1000));
758 let fut2 = sleep(Millis(200));
759
760 fut2.await;
761 #[cfg(not(target_os = "macos"))]
762 {
763 let _elapsed = time.elapsed();
764 assert!(
765 _elapsed > Duration::from_millis(200)
766 && _elapsed < Duration::from_millis(300),
767 "elapsed: {_elapsed:?}"
768 );
769 }
770
771 fut1.await;
772
773 #[cfg(not(target_os = "macos"))]
774 {
775 let _elapsed = time.elapsed();
776 assert!(
777 _elapsed > Duration::from_secs(1) && _elapsed < Duration::from_millis(1200), "elapsed: {_elapsed:?}",
779 );
780 }
781
782 let time = Instant::now();
783 sleep(Millis(25)).await;
784 #[cfg(not(target_os = "macos"))]
785 {
786 let _elapsed = time.elapsed();
787 assert!(
788 _elapsed > Duration::from_millis(20)
789 && _elapsed < Duration::from_millis(50),
790 "elapsed: {_elapsed:?}",
791 );
792 }
793 }
794}