1#![allow(arithmetic_overflow, clippy::let_underscore_future)]
5use std::time::{Duration, Instant, SystemTime};
6use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
7
8use futures_timer::Delay;
9use slab::Slab;
10
11use crate::task::LocalWaker;
12
13const LVL_CLK_SHIFT: u64 = 3;
15const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
16const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
17
18const fn lvl_shift(n: u64) -> u64 {
19 n * LVL_CLK_SHIFT
20}
21
22const fn lvl_gran(n: u64) -> u64 {
23 1 << lvl_shift(n)
24}
25
26const UNITS: u64 = 4;
30const fn to_units(n: u64) -> u64 {
33 n >> UNITS
34}
35
36const fn to_millis(n: u64) -> u64 {
37 n << UNITS
38}
39
40const fn lvl_start(lvl: u64) -> u64 {
42 (LVL_SIZE - 1) << ((lvl - 1) * LVL_CLK_SHIFT)
43}
44
45const LVL_BITS: u64 = 6;
47const LVL_SIZE: u64 = 1 << LVL_BITS;
48const LVL_MASK: u64 = LVL_SIZE - 1;
49
50const LVL_DEPTH: u64 = 8;
52
53const fn lvl_offs(n: u64) -> u64 {
54 n * LVL_SIZE
55}
56
57const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
59const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
60const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
61
62const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
64
65const fn as_millis(dur: Duration) -> u64 {
66 dur.as_secs() * 1_000 + (dur.subsec_millis() as u64)
67}
68
69#[inline]
73pub fn now() -> Instant {
74 TIMER.with(|t| t.with_mod(|inner| t.now(inner)))
75}
76
77#[inline]
81pub fn system_time() -> SystemTime {
82 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
83}
84
85#[inline]
90#[doc(hidden)]
91pub fn query_system_time() -> SystemTime {
92 TIMER.with(|t| t.with_mod(|inner| t.system_time(inner)))
93}
94
95#[derive(Debug)]
96pub struct TimerHandle(usize);
97
98impl TimerHandle {
99 pub fn new(millis: u64) -> Self {
101 TIMER.with(|t| t.add_timer(millis))
102 }
103
104 pub fn reset(&self, millis: u64) {
106 TIMER.with(|t| t.update_timer(self.0, millis))
107 }
108
109 pub fn elapse(&self) {
111 TIMER.with(|t| t.remove_timer(self.0))
112 }
113
114 pub fn is_elapsed(&self) -> bool {
115 TIMER.with(|t| t.with_mod(|m| m.timers[self.0].bucket.is_none()))
116 }
117
118 pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
119 TIMER.with(|t| {
120 t.with_mod(|inner| {
121 let entry = &inner.timers[self.0];
122 if entry.bucket.is_none() {
123 Poll::Ready(())
124 } else {
125 entry.task.register(cx.waker());
126 Poll::Pending
127 }
128 })
129 })
130 }
131}
132
133impl Drop for TimerHandle {
134 fn drop(&mut self) {
135 TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0, true)))
136 }
137}
138
139bitflags::bitflags! {
140 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
141 pub struct Flags: u8 {
142 const DRIVER_STARTED = 0b0000_0001;
143 const DRIVER_RECALC = 0b0000_0010;
144 const LOWRES_TIMER = 0b0000_1000;
145 const LOWRES_DRIVER = 0b0001_0000;
146 const RUNNING = 0b0010_0000;
147 }
148}
149
150thread_local! {
151 static TIMER: Timer = Timer::new();
152}
153
154struct Timer(Rc<TimerInner>);
155
156struct TimerInner {
157 elapsed: Cell<u64>,
158 elapsed_time: Cell<Option<Instant>>,
159 next_expiry: Cell<u64>,
160 flags: Cell<Flags>,
161 driver: LocalWaker,
162 lowres_time: Cell<Option<Instant>>,
163 lowres_stime: Cell<Option<SystemTime>>,
164 lowres_driver: LocalWaker,
165 inner: Cell<Option<Box<TimerMod>>>,
166}
167
168struct TimerMod {
169 timers: Slab<TimerEntry>,
170 driver_sleep: Delay,
171 buckets: Vec<Bucket>,
172 occupied: [u64; WHEEL_SIZE],
174 lowres_driver_sleep: Delay,
175}
176
177impl Timer {
178 fn new() -> Self {
179 Timer(Rc::new(TimerInner {
180 elapsed: Cell::new(0),
181 elapsed_time: Cell::new(None),
182 next_expiry: Cell::new(u64::MAX),
183 flags: Cell::new(Flags::empty()),
184 driver: LocalWaker::new(),
185 lowres_time: Cell::new(None),
186 lowres_stime: Cell::new(None),
187 lowres_driver: LocalWaker::new(),
188 inner: Cell::new(Some(Box::new(TimerMod {
189 buckets: Self::create_buckets(),
190 timers: Slab::default(),
191 driver_sleep: Delay::new(Duration::ZERO),
192 occupied: [0; WHEEL_SIZE],
193 lowres_driver_sleep: Delay::new(Duration::ZERO),
194 }))),
195 }))
196 }
197
198 fn with_mod<F, R>(&self, f: F) -> R
199 where
200 F: FnOnce(&mut TimerMod) -> R,
201 {
202 let mut m = self.0.inner.take().unwrap();
203 let result = f(&mut m);
204 self.0.inner.set(Some(m));
205 result
206 }
207
208 fn create_buckets() -> Vec<Bucket> {
209 let mut buckets = Vec::with_capacity(WHEEL_SIZE);
210 for idx in 0..WHEEL_SIZE {
211 let lvl = idx / (LVL_SIZE as usize);
212 let offs = idx % (LVL_SIZE as usize);
213 buckets.push(Bucket::new(lvl, offs))
214 }
215 buckets
216 }
217
218 fn now(&self, inner: &mut TimerMod) -> Instant {
219 if let Some(cur) = self.0.lowres_time.get() {
220 cur
221 } else {
222 let now = Instant::now();
223
224 let flags = self.0.flags.get();
225 if flags.contains(Flags::RUNNING) {
226 self.0.lowres_time.set(Some(now));
227
228 if flags.contains(Flags::LOWRES_DRIVER) {
229 self.0.lowres_driver.wake();
230 } else {
231 LowresTimerDriver::start(self.0.clone(), inner);
232 }
233 }
234 now
235 }
236 }
237
238 fn system_time(&self, inner: &mut TimerMod) -> SystemTime {
239 if let Some(cur) = self.0.lowres_stime.get() {
240 cur
241 } else {
242 let now = SystemTime::now();
243 let flags = self.0.flags.get();
244
245 if flags.contains(Flags::RUNNING) {
246 self.0.lowres_stime.set(Some(now));
247
248 if flags.contains(Flags::LOWRES_DRIVER) {
249 self.0.lowres_driver.wake();
250 } else {
251 LowresTimerDriver::start(self.0.clone(), inner);
252 }
253 }
254 now
255 }
256 }
257
258 fn add_timer(&self, millis: u64) -> TimerHandle {
260 self.with_mod(|inner| {
261 if millis == 0 {
262 let entry = inner.timers.vacant_entry();
263 let no = entry.key();
264
265 entry.insert(TimerEntry {
266 bucket_entry: 0,
267 bucket: None,
268 task: LocalWaker::new(),
269 });
270 return TimerHandle(no);
271 }
272
273 let mut flags = self.0.flags.get();
274 flags.insert(Flags::RUNNING);
275 self.0.flags.set(flags);
276
277 let now = self.now(inner);
278 let elapsed_time = self.0.elapsed_time();
279 let delta = if now >= elapsed_time {
280 to_units(as_millis(now - elapsed_time) + millis)
281 } else {
282 to_units(millis)
283 };
284
285 let (no, bucket_expiry) = {
286 let (idx, bucket_expiry) = self
288 .0
289 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
290
291 let no = inner.add_entry(idx);
292 (no, bucket_expiry)
293 };
294
295 if bucket_expiry < self.0.next_expiry.get() {
297 self.0.next_expiry.set(bucket_expiry);
298 if flags.contains(Flags::DRIVER_STARTED) {
299 flags.insert(Flags::DRIVER_RECALC);
300 self.0.flags.set(flags);
301 self.0.driver.wake();
302 } else {
303 TimerDriver::start(self.0.clone(), inner);
304 }
305 }
306
307 TimerHandle(no)
308 })
309 }
310
311 fn remove_timer(&self, hnd: usize) {
313 self.with_mod(|inner| {
314 inner.remove_timer_bucket(hnd, false);
315 inner.timers[hnd].complete();
316 })
317 }
318
319 fn update_timer(&self, hnd: usize, millis: u64) {
321 self.with_mod(|inner| {
322 if millis == 0 {
323 inner.remove_timer_bucket(hnd, false);
324 inner.timers[hnd].bucket = None;
325 return;
326 }
327
328 let now = self.now(inner);
329 let elapsed_time = self.0.elapsed_time();
330 let delta = if now >= elapsed_time {
331 max(to_units(as_millis(now - elapsed_time) + millis), 1)
332 } else {
333 max(to_units(millis), 1)
334 };
335
336 let bucket_expiry = {
337 let (idx, bucket_expiry) = self
339 .0
340 .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
341
342 inner.update_entry(hnd, idx);
343
344 bucket_expiry
345 };
346
347 if bucket_expiry < self.0.next_expiry.get() {
349 self.0.next_expiry.set(bucket_expiry);
350 let mut flags = self.0.flags.get();
351 if flags.contains(Flags::DRIVER_STARTED) {
352 flags.insert(Flags::DRIVER_RECALC);
353 self.0.flags.set(flags);
354 self.0.driver.wake();
355 } else {
356 TimerDriver::start(self.0.clone(), inner);
357 }
358 }
359 })
360 }
361}
362
363impl TimerMod {
364 fn execute_expired_timers(&mut self, mut clk: u64) {
365 for lvl in 0..LVL_DEPTH {
366 let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
367 let b = &mut self.buckets[idx as usize];
368 if !b.entries.is_empty() {
369 self.occupied[b.lvl as usize] &= b.bit_n;
370 for no in b.entries.drain() {
371 if let Some(timer) = self.timers.get_mut(no) {
372 timer.complete();
373 }
374 }
375 }
376
377 if (clk & LVL_CLK_MASK) != 0 {
379 break;
380 }
381 clk >>= LVL_CLK_SHIFT;
383 }
384 }
385
386 fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
387 let entry = &mut self.timers[handle];
388 if let Some(bucket) = entry.bucket {
389 let b = &mut self.buckets[bucket as usize];
390 b.entries.remove(entry.bucket_entry);
391 if b.entries.is_empty() {
392 self.occupied[b.lvl as usize] &= b.bit_n;
393 }
394 }
395
396 if remove_handle {
397 self.timers.remove(handle);
398 }
399 }
400
401 fn add_entry(&mut self, idx: usize) -> usize {
402 let entry = self.timers.vacant_entry();
403 let no = entry.key();
404 let bucket = &mut self.buckets[idx];
405 let bucket_entry = bucket.add_entry(no);
406
407 entry.insert(TimerEntry {
408 bucket_entry,
409 bucket: Some(idx as u16),
410 task: LocalWaker::new(),
411 });
412 self.occupied[bucket.lvl as usize] |= bucket.bit;
413
414 no
415 }
416
417 fn update_entry(&mut self, hnd: usize, idx: usize) {
418 let entry = &mut self.timers[hnd];
419
420 if let Some(bucket) = entry.bucket {
422 if idx == bucket as usize {
424 return;
425 }
426
427 let b = &mut self.buckets[bucket as usize];
429 b.entries.remove(entry.bucket_entry);
430 if b.entries.is_empty() {
431 self.occupied[b.lvl as usize] &= b.bit_n;
432 }
433 }
434
435 let bucket = &mut self.buckets[idx];
437 entry.bucket = Some(idx as u16);
438 entry.bucket_entry = bucket.add_entry(hnd);
439
440 self.occupied[bucket.lvl as usize] |= bucket.bit;
441 }
442}
443
444impl TimerInner {
445 fn with_mod<F, R>(&self, f: F) -> R
446 where
447 F: FnOnce(&mut TimerMod) -> R,
448 {
449 let mut m = self.inner.take().unwrap();
450 let result = f(&mut m);
451 self.inner.set(Some(m));
452 result
453 }
454
455 fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
456 if delta < lvl_start(1) {
457 Self::calc_index(expires, 0)
458 } else if delta < lvl_start(2) {
459 Self::calc_index(expires, 1)
460 } else if delta < lvl_start(3) {
461 Self::calc_index(expires, 2)
462 } else if delta < lvl_start(4) {
463 Self::calc_index(expires, 3)
464 } else if delta < lvl_start(5) {
465 Self::calc_index(expires, 4)
466 } else if delta < lvl_start(6) {
467 Self::calc_index(expires, 5)
468 } else if delta < lvl_start(7) {
469 Self::calc_index(expires, 6)
470 } else if delta < lvl_start(8) {
471 Self::calc_index(expires, 7)
472 } else {
473 if delta >= WHEEL_TIMEOUT_CUTOFF {
476 Self::calc_index(
477 self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
478 LVL_DEPTH - 1,
479 )
480 } else {
481 Self::calc_index(expires, LVL_DEPTH - 1)
482 }
483 }
484 }
485
486 fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
488 let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
496 (
497 (lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
498 expires << lvl_shift(lvl),
499 )
500 }
501
502 fn elapsed_time(&self) -> Instant {
503 if let Some(elapsed_time) = self.elapsed_time.get() {
504 elapsed_time
505 } else {
506 let elapsed_time = Instant::now();
507 self.elapsed_time.set(Some(elapsed_time));
508 elapsed_time
509 }
510 }
511
512 fn execute_expired_timers(&self, inner: &mut TimerMod) {
513 inner.execute_expired_timers(self.next_expiry.get());
514 }
515
516 fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option<u64> {
518 let mut clk = self.elapsed.get();
519 let mut next = u64::MAX;
520
521 for lvl in 0..LVL_DEPTH {
522 let lvl_clk = clk & LVL_CLK_MASK;
523 let occupied = inner.occupied[lvl as usize];
524 let pos = if occupied == 0 {
525 -1
526 } else {
527 let zeros = occupied
528 .rotate_right((clk & LVL_MASK) as u32)
529 .trailing_zeros() as usize;
530 zeros as isize
531 };
532
533 if pos >= 0 {
534 let tmp = (clk + pos as u64) << lvl_shift(lvl);
535 if tmp < next {
536 next = tmp
537 }
538
539 if (pos as u64) <= ((LVL_CLK_DIV - lvl_clk) & LVL_CLK_MASK) {
542 break;
543 }
544 }
545
546 clk >>= LVL_CLK_SHIFT;
547 clk += u64::from(lvl_clk != 0);
548 }
549
550 if next < u64::MAX {
551 Some(next)
552 } else {
553 None
554 }
555 }
556
557 fn next_expiry_ms(&self) -> u64 {
559 to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
560 }
561
562 fn stop_wheel(&self) {
563 if let Some(mut inner) = self.inner.take() {
565 let mut buckets = mem::take(&mut inner.buckets);
566 for b in &mut buckets {
567 for no in b.entries.drain() {
568 inner.timers[no].bucket = None;
569 }
570 }
571
572 self.flags.set(Flags::empty());
574 self.next_expiry.set(u64::MAX);
575 self.elapsed.set(0);
576 self.elapsed_time.set(None);
577 self.lowres_time.set(None);
578 self.lowres_stime.set(None);
579
580 inner.buckets = buckets;
581 inner.occupied = [0; WHEEL_SIZE];
582 self.inner.set(Some(inner));
583 }
584 }
585}
586
587#[derive(Debug)]
588struct Bucket {
589 lvl: u32,
590 bit: u64,
591 bit_n: u64,
592 entries: Slab<usize>,
593}
594
595impl Bucket {
596 fn add_entry(&mut self, no: usize) -> usize {
597 self.entries.insert(no)
598 }
599}
600
601impl Bucket {
602 fn new(lvl: usize, offs: usize) -> Self {
603 let bit = 1 << (offs as u64);
604 Bucket {
605 bit,
606 lvl: lvl as u32,
607 bit_n: !bit,
608 entries: Slab::default(),
609 }
610 }
611}
612
613#[derive(Debug)]
614struct TimerEntry {
615 bucket: Option<u16>,
616 bucket_entry: usize,
617 task: LocalWaker,
618}
619
620impl TimerEntry {
621 fn complete(&mut self) {
622 if self.bucket.is_some() {
623 self.bucket.take();
624 self.task.wake();
625 }
626 }
627}
628
629struct TimerDriver(Rc<TimerInner>);
630
631impl TimerDriver {
632 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
633 let mut flags = timer.flags.get();
634 flags.insert(Flags::DRIVER_STARTED);
635 timer.flags.set(flags);
636 inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms()));
637
638 let _ = crate::spawn(TimerDriver(timer));
639 }
640}
641
642impl Drop for TimerDriver {
643 fn drop(&mut self) {
644 self.0.stop_wheel();
645 }
646}
647
648impl Future for TimerDriver {
649 type Output = ();
650
651 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
652 self.0.driver.register(cx.waker());
653
654 self.0.with_mod(|inner| {
655 let mut flags = self.0.flags.get();
656 if flags.contains(Flags::DRIVER_RECALC) {
657 flags.remove(Flags::DRIVER_RECALC);
658 self.0.flags.set(flags);
659
660 let now = Instant::now();
661 let deadline =
662 if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
663 Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
664 } else {
665 Duration::from_millis(self.0.next_expiry_ms())
666 };
667 inner.driver_sleep.reset(deadline);
668 }
669
670 loop {
671 if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
672 let now = Instant::now();
673 self.0.elapsed.set(self.0.next_expiry.get());
674 self.0.elapsed_time.set(Some(now));
675 self.0.execute_expired_timers(inner);
676
677 if let Some(next_expiry) = self.0.next_pending_bucket(inner) {
678 self.0.next_expiry.set(next_expiry);
679 let dur = Duration::from_millis(self.0.next_expiry_ms());
680 inner.driver_sleep.reset(dur);
681 continue;
682 } else {
683 self.0.next_expiry.set(u64::MAX);
684 self.0.elapsed_time.set(None);
685 }
686 }
687 return Poll::Pending;
688 }
689 })
690 }
691}
692
693struct LowresTimerDriver(Rc<TimerInner>);
694
695impl LowresTimerDriver {
696 fn start(timer: Rc<TimerInner>, inner: &mut TimerMod) {
697 let mut flags = timer.flags.get();
698 flags.insert(Flags::LOWRES_DRIVER);
699 timer.flags.set(flags);
700 inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
701
702 let _ = crate::spawn(LowresTimerDriver(timer));
703 }
704}
705
706impl Drop for LowresTimerDriver {
707 fn drop(&mut self) {
708 self.0.stop_wheel();
709 }
710}
711
712impl Future for LowresTimerDriver {
713 type Output = ();
714
715 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
716 self.0.lowres_driver.register(cx.waker());
717
718 self.0.with_mod(|inner| {
719 let mut flags = self.0.flags.get();
720 if !flags.contains(Flags::LOWRES_TIMER) {
721 flags.insert(Flags::LOWRES_TIMER);
722 self.0.flags.set(flags);
723 inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
724 }
725
726 if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
727 self.0.lowres_time.set(None);
728 self.0.lowres_stime.set(None);
729 flags.remove(Flags::LOWRES_TIMER);
730 self.0.flags.set(flags);
731 }
732 Poll::Pending
733 })
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740 use crate::time::{interval, sleep, Millis};
741
742 #[ntex_macros::rt_test2]
743 async fn test_timer() {
744 crate::spawn(async {
745 let s = interval(Millis(25));
746 loop {
747 s.tick().await;
748 }
749 });
750 let time = Instant::now();
751 let fut1 = sleep(Millis(1000));
752 let fut2 = sleep(Millis(200));
753
754 fut2.await;
755 let _elapsed = Instant::now() - time;
756 #[cfg(not(target_os = "macos"))]
757 assert!(
758 _elapsed > Duration::from_millis(200) && _elapsed < Duration::from_millis(300),
759 "elapsed: {_elapsed:?}"
760 );
761
762 fut1.await;
763 let _elapsed = Instant::now() - time;
764 #[cfg(not(target_os = "macos"))]
765 assert!(
766 _elapsed > Duration::from_millis(1000)
767 && _elapsed < Duration::from_millis(1200), "elapsed: {_elapsed:?}",
769 );
770
771 let time = Instant::now();
772 sleep(Millis(25)).await;
773 let _elapsed = Instant::now() - time;
774 #[cfg(not(target_os = "macos"))]
775 assert!(
776 _elapsed > Duration::from_millis(20) && _elapsed < Duration::from_millis(50),
777 "elapsed: {_elapsed:?}",
778 );
779 }
780}