1use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore, UnboundedStore};
18
19#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48 tick_duration: Duration,
49 slots_per_level: usize,
50 clk_shift: u32,
51 num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55 fn default() -> Self {
56 WheelBuilder {
57 tick_duration: Duration::from_millis(1),
58 slots_per_level: 64,
59 clk_shift: 3,
60 num_levels: 7,
61 }
62 }
63}
64
65impl WheelBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn tick_duration(mut self, d: Duration) -> Self {
73 self.tick_duration = d;
74 self
75 }
76
77 pub fn slots_per_level(mut self, n: usize) -> Self {
79 self.slots_per_level = n;
80 self
81 }
82
83 pub fn clk_shift(mut self, s: u32) -> Self {
85 self.clk_shift = s;
86 self
87 }
88
89 pub fn num_levels(mut self, n: usize) -> Self {
91 self.num_levels = n;
92 self
93 }
94
95 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100 UnboundedWheelBuilder {
101 config: self,
102 chunk_capacity,
103 }
104 }
105
106 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110 BoundedWheelBuilder {
111 config: self,
112 capacity,
113 }
114 }
115
116 fn validate(&self) {
117 assert!(
118 self.slots_per_level.is_power_of_two(),
119 "slots_per_level must be a power of 2, got {}",
120 self.slots_per_level
121 );
122 assert!(
123 self.slots_per_level <= 64,
124 "slots_per_level must be <= 64 (u64 bitmask), got {}",
125 self.slots_per_level
126 );
127 assert!(self.num_levels > 0, "num_levels must be > 0");
128 assert!(
129 self.num_levels <= 8,
130 "num_levels must be <= 8 (u8 bitmask), got {}",
131 self.num_levels
132 );
133 assert!(self.clk_shift > 0, "clk_shift must be > 0");
134 assert!(
135 !self.tick_duration.is_zero(),
136 "tick_duration must be non-zero"
137 );
138 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139 assert!(
140 max_shift < 64,
141 "(num_levels - 1) * clk_shift must be < 64, got {}",
142 max_shift
143 );
144 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145 assert!(
146 slots_log2 + max_shift < 64,
147 "slots_per_level << max_shift would overflow u64"
148 );
149 }
150
151 fn tick_ns(&self) -> u64 {
152 self.tick_duration.as_nanos() as u64
153 }
154}
155
156#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161 config: WheelBuilder,
162 chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173 self.config.validate();
174 let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175 let levels = build_levels::<T>(&self.config);
176 TimerWheel {
177 slab,
178 num_levels: self.config.num_levels,
179 levels,
180 current_ticks: 0,
181 tick_ns: self.config.tick_ns(),
182 epoch: now,
183 active_levels: 0,
184 len: 0,
185 _marker: PhantomData,
186 }
187 }
188}
189
190#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195 config: WheelBuilder,
196 capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207 self.config.validate();
208 let slab = bounded::Slab::with_capacity(self.capacity);
209 let levels = build_levels::<T>(&self.config);
210 TimerWheel {
211 slab,
212 num_levels: self.config.num_levels,
213 levels,
214 current_ticks: 0,
215 tick_ns: self.config.tick_ns(),
216 epoch: now,
217 active_levels: 0,
218 len: 0,
219 _marker: PhantomData,
220 }
221 }
222}
223
224pub struct TimerWheel<
241 T: 'static,
242 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244 slab: S,
245 levels: Vec<Level<T>>,
246 num_levels: usize,
247 active_levels: u8,
248 current_ticks: u64,
249 tick_ns: u64,
250 epoch: Instant,
251 len: usize,
252 _marker: PhantomData<*const ()>, }
254
255#[allow(clippy::non_send_fields_in_send_ty)]
278unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
279
280pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
282
283pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
285
286impl<T: 'static> Wheel<T> {
291 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
295 WheelBuilder::default().unbounded(chunk_capacity).build(now)
296 }
297}
298
299impl<T: 'static> BoundedWheel<T> {
300 pub fn bounded(capacity: usize, now: Instant) -> Self {
304 WheelBuilder::default().bounded(capacity).build(now)
305 }
306}
307
308fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
309 (0..config.num_levels)
310 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
311 .collect()
312}
313
314impl<T: 'static, S: UnboundedStore<Item = WheelEntry<T>> + SlabStore<Item = WheelEntry<T>>>
319 TimerWheel<T, S>
320{
321 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
326 let deadline_ticks = self.instant_to_ticks(deadline);
327 let entry = WheelEntry::new(deadline_ticks, value, 2);
328 let slot = self.slab.alloc(entry);
329 let ptr = slot.as_ptr();
330 self.insert_entry(ptr, deadline_ticks);
331 self.len += 1;
332 TimerHandle::new(ptr)
333 }
334
335 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
340 let deadline_ticks = self.instant_to_ticks(deadline);
341 let entry = WheelEntry::new(deadline_ticks, value, 1);
342 let slot = self.slab.alloc(entry);
343 let ptr = slot.as_ptr();
344 self.insert_entry(ptr, deadline_ticks);
345 self.len += 1;
346 }
347}
348
349impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>> + SlabStore<Item = WheelEntry<T>>>
354 TimerWheel<T, S>
355{
356 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
360 let deadline_ticks = self.instant_to_ticks(deadline);
361 let entry = WheelEntry::new(deadline_ticks, value, 2);
362 match self.slab.try_alloc(entry) {
363 Ok(slot) => {
364 let ptr = slot.as_ptr();
365 self.insert_entry(ptr, deadline_ticks);
366 self.len += 1;
367 Ok(TimerHandle::new(ptr))
368 }
369 Err(full) => {
370 let wheel_entry = full.into_inner();
373 let value = unsafe { wheel_entry.take_value() }
374 .expect("entry was just constructed with Some(value)");
375 Err(Full(value))
376 }
377 }
378 }
379
380 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
384 let deadline_ticks = self.instant_to_ticks(deadline);
385 let entry = WheelEntry::new(deadline_ticks, value, 1);
386 match self.slab.try_alloc(entry) {
387 Ok(slot) => {
388 let ptr = slot.as_ptr();
389 self.insert_entry(ptr, deadline_ticks);
390 self.len += 1;
391 Ok(())
392 }
393 Err(full) => {
394 let wheel_entry = full.into_inner();
395 let value = unsafe { wheel_entry.take_value() }
396 .expect("entry was just constructed with Some(value)");
397 Err(Full(value))
398 }
399 }
400 }
401}
402
403impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
408 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
417 let ptr = handle.ptr;
418 mem::forget(handle);
420
421 let entry = unsafe { entry_ref(ptr) };
423 let refs = entry.refs();
424
425 if refs == 2 {
426 let value = unsafe { entry.take_value() };
428 self.remove_entry(ptr);
429 self.len -= 1;
430 unsafe { self.slab.free_ptr(ptr) };
432 value
433 } else {
434 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
437 unsafe { self.slab.free_ptr(ptr) };
439 None
440 }
441 }
442
443 pub fn free(&mut self, handle: TimerHandle<T>) {
451 let ptr = handle.ptr;
452 mem::forget(handle);
453
454 let entry = unsafe { entry_ref(ptr) };
456 let new_refs = entry.dec_refs();
457
458 if new_refs == 0 {
459 unsafe { self.slab.free_ptr(ptr) };
462 }
463 }
465
466 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
478 let ptr = handle.ptr;
479 mem::forget(handle);
480
481 let entry = unsafe { entry_ref(ptr) };
483 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
484
485 self.remove_entry(ptr);
487
488 let new_ticks = self.instant_to_ticks(new_deadline);
490 entry.set_deadline_ticks(new_ticks);
491 self.insert_entry(ptr, new_ticks);
492
493 TimerHandle::new(ptr)
494 }
495
496 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
500 self.poll_with_limit(now, usize::MAX, buf)
501 }
502
503 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
510 let now_ticks = self.instant_to_ticks(now);
511 self.current_ticks = now_ticks;
512
513 let mut fired = 0;
514 let mut mask = self.active_levels;
515
516 while mask != 0 && fired < limit {
517 let lvl_idx = mask.trailing_zeros() as usize;
518 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
520 }
521 fired
522 }
523
524 pub fn next_deadline(&self) -> Option<Instant> {
529 let mut min_ticks: Option<u64> = None;
530
531 let mut lvl_mask = self.active_levels;
532 while lvl_mask != 0 {
533 let lvl_idx = lvl_mask.trailing_zeros() as usize;
534 lvl_mask &= lvl_mask - 1;
535
536 let level = &self.levels[lvl_idx];
537 let mut slot_mask = level.active_slots();
538 while slot_mask != 0 {
539 let slot_idx = slot_mask.trailing_zeros() as usize;
540 slot_mask &= slot_mask - 1;
541
542 let slot = level.slot(slot_idx);
543 let mut entry_ptr = slot.entry_head();
544
545 while !entry_ptr.is_null() {
546 let entry = unsafe { entry_ref(entry_ptr) };
548 let dt = entry.deadline_ticks();
549 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
550 entry_ptr = entry.next();
551 }
552 }
553 }
554
555 min_ticks.map(|t| self.ticks_to_instant(t))
556 }
557
558 #[inline]
560 pub fn len(&self) -> usize {
561 self.len
562 }
563
564 #[inline]
566 pub fn is_empty(&self) -> bool {
567 self.len == 0
568 }
569
570 #[inline]
575 fn instant_to_ticks(&self, instant: Instant) -> u64 {
576 let dur = instant.saturating_duration_since(self.epoch);
578 dur.as_nanos() as u64 / self.tick_ns
579 }
580
581 #[inline]
582 fn ticks_to_instant(&self, ticks: u64) -> Instant {
583 self.epoch + Duration::from_nanos(ticks * self.tick_ns)
584 }
585
586 #[inline]
596 fn select_level(&self, deadline_ticks: u64) -> usize {
597 let delta = deadline_ticks.saturating_sub(self.current_ticks);
598
599 for (i, level) in self.levels.iter().enumerate() {
600 if delta < level.range() {
601 return i;
602 }
603 }
604
605 self.num_levels - 1
607 }
608
609 #[inline]
618 #[allow(clippy::needless_pass_by_ref_mut)]
619 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
620 let lvl_idx = self.select_level(deadline_ticks);
621 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
622
623 let entry = unsafe { entry_ref(entry_ptr) };
626 entry.set_location(lvl_idx as u8, slot_idx as u16);
627
628 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
630
631 self.levels[lvl_idx].activate_slot(slot_idx);
633 self.active_levels |= 1 << lvl_idx;
634 }
635
636 #[inline]
642 #[allow(clippy::needless_pass_by_ref_mut)]
643 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
644 let entry = unsafe { entry_ref(entry_ptr) };
646
647 let lvl_idx = entry.level() as usize;
648 let slot_idx = entry.slot_idx() as usize;
649
650 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
652
653 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
654 self.levels[lvl_idx].deactivate_slot(slot_idx);
655 if !self.levels[lvl_idx].is_active() {
656 self.active_levels &= !(1 << lvl_idx);
657 }
658 }
659 }
660
661 #[inline]
669 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
670 let entry = unsafe { entry_ref(entry_ptr) };
672
673 let value = unsafe { entry.take_value() };
676
677 let new_refs = entry.dec_refs();
678 if new_refs == 0 {
679 unsafe { self.slab.free_ptr(entry_ptr) };
682 }
683 self.len -= 1;
687 value
688 }
689
690 fn poll_level(
697 &mut self,
698 lvl_idx: usize,
699 now_ticks: u64,
700 limit: usize,
701 buf: &mut Vec<T>,
702 ) -> usize {
703 let mut fired = 0;
704 let mut mask = self.levels[lvl_idx].active_slots();
705
706 while mask != 0 && fired < limit {
707 let slot_idx = mask.trailing_zeros() as usize;
708 mask &= mask - 1;
709
710 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
711 let slot = unsafe { &*slot_ptr };
715 let mut entry_ptr = slot.entry_head();
716
717 while !entry_ptr.is_null() && fired < limit {
718 let entry = unsafe { entry_ref(entry_ptr) };
720 let next_entry = entry.next();
721
722 if entry.deadline_ticks() <= now_ticks {
723 unsafe { slot.remove_entry(entry_ptr) };
724
725 if let Some(value) = self.fire_entry(entry_ptr) {
726 buf.push(value);
727 }
728 fired += 1;
729 }
730
731 entry_ptr = next_entry;
732 }
733
734 if slot.is_empty() {
735 self.levels[lvl_idx].deactivate_slot(slot_idx);
736 }
737 }
738
739 if !self.levels[lvl_idx].is_active() {
741 self.active_levels &= !(1 << lvl_idx);
742 }
743
744 fired
745 }
746}
747
748impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
753 fn drop(&mut self) {
754 let mut lvl_mask = self.active_levels;
756 while lvl_mask != 0 {
757 let lvl_idx = lvl_mask.trailing_zeros() as usize;
758 lvl_mask &= lvl_mask - 1;
759
760 let level = &self.levels[lvl_idx];
761 let mut slot_mask = level.active_slots();
762 while slot_mask != 0 {
763 let slot_idx = slot_mask.trailing_zeros() as usize;
764 slot_mask &= slot_mask - 1;
765
766 let slot = level.slot(slot_idx);
767 let mut entry_ptr = slot.entry_head();
768 while !entry_ptr.is_null() {
769 let entry = unsafe { entry_ref(entry_ptr) };
771 let next_entry = entry.next();
772
773 unsafe { self.slab.free(nexus_slab::Slot::from_ptr(entry_ptr)) };
775
776 entry_ptr = next_entry;
777 }
778 }
779 }
780 }
781}
782
783#[cfg(test)]
788mod tests {
789 use super::*;
790 use std::time::{Duration, Instant};
791
792 fn ms(millis: u64) -> Duration {
793 Duration::from_millis(millis)
794 }
795
796 fn _assert_send<T: Send>() {}
801
802 #[test]
803 fn wheel_is_send() {
804 _assert_send::<Wheel<u64>>();
805 _assert_send::<BoundedWheel<u64>>();
806 }
807
808 #[test]
813 fn default_config() {
814 let now = Instant::now();
815 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
816 assert!(wheel.is_empty());
817 assert_eq!(wheel.len(), 0);
818 }
819
820 #[test]
821 fn bounded_construction() {
822 let now = Instant::now();
823 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
824 assert!(wheel.is_empty());
825 }
826
827 #[test]
828 #[should_panic(expected = "slots_per_level must be a power of 2")]
829 fn invalid_config_non_power_of_two() {
830 let now = Instant::now();
831 WheelBuilder::default()
832 .slots_per_level(65)
833 .unbounded(1024)
834 .build::<u64>(now);
835 }
836
837 #[test]
842 fn schedule_and_cancel() {
843 let now = Instant::now();
844 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
845
846 let h = wheel.schedule(now + ms(50), 42);
847 assert_eq!(wheel.len(), 1);
848
849 let val = wheel.cancel(h);
850 assert_eq!(val, Some(42));
851 assert_eq!(wheel.len(), 0);
852 }
853
854 #[test]
855 fn schedule_forget_fires() {
856 let now = Instant::now();
857 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
858
859 wheel.schedule_forget(now + ms(10), 99);
860 assert_eq!(wheel.len(), 1);
861
862 let mut buf = Vec::new();
863 let fired = wheel.poll(now + ms(20), &mut buf);
864 assert_eq!(fired, 1);
865 assert_eq!(buf, vec![99]);
866 assert_eq!(wheel.len(), 0);
867 }
868
869 #[test]
870 fn cancel_after_fire_returns_none() {
871 let now = Instant::now();
872 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
873
874 let h = wheel.schedule(now + ms(10), 42);
875
876 let mut buf = Vec::new();
877 wheel.poll(now + ms(20), &mut buf);
878 assert_eq!(buf, vec![42]);
879
880 let val = wheel.cancel(h);
882 assert_eq!(val, None);
883 }
884
885 #[test]
886 fn free_active_timer_becomes_fire_and_forget() {
887 let now = Instant::now();
888 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
889
890 let h = wheel.schedule(now + ms(10), 42);
891 wheel.free(h); assert_eq!(wheel.len(), 1);
893
894 let mut buf = Vec::new();
895 wheel.poll(now + ms(20), &mut buf);
896 assert_eq!(buf, vec![42]);
897 assert_eq!(wheel.len(), 0);
898 }
899
900 #[test]
901 fn free_zombie_handle() {
902 let now = Instant::now();
903 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
904
905 let h = wheel.schedule(now + ms(10), 42);
906
907 let mut buf = Vec::new();
908 wheel.poll(now + ms(20), &mut buf);
909
910 wheel.free(h);
912 }
913
914 #[test]
919 fn bounded_full() {
920 let now = Instant::now();
921 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
922
923 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
924 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
925
926 let err = wheel.try_schedule(now + ms(30), 3);
927 assert!(err.is_err());
928 let recovered = err.unwrap_err().into_inner();
929 assert_eq!(recovered, 3);
930
931 wheel.cancel(h1);
933 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
934
935 wheel.free(h2);
937 wheel.free(h3);
938 }
939
940 #[test]
941 fn bounded_schedule_forget_full() {
942 let now = Instant::now();
943 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
944
945 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
946 let err = wheel.try_schedule_forget(now + ms(20), 2);
947 assert!(err.is_err());
948 }
949
950 #[test]
955 fn poll_respects_deadline() {
956 let now = Instant::now();
957 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
958
959 wheel.schedule_forget(now + ms(10), 1);
960 wheel.schedule_forget(now + ms(50), 2);
961 wheel.schedule_forget(now + ms(100), 3);
962
963 let mut buf = Vec::new();
964
965 let fired = wheel.poll(now + ms(20), &mut buf);
967 assert_eq!(fired, 1);
968 assert_eq!(buf, vec![1]);
969 assert_eq!(wheel.len(), 2);
970
971 buf.clear();
973 let fired = wheel.poll(now + ms(60), &mut buf);
974 assert_eq!(fired, 1);
975 assert_eq!(buf, vec![2]);
976
977 buf.clear();
979 let fired = wheel.poll(now + ms(200), &mut buf);
980 assert_eq!(fired, 1);
981 assert_eq!(buf, vec![3]);
982
983 assert!(wheel.is_empty());
984 }
985
986 #[test]
987 fn poll_with_limit() {
988 let now = Instant::now();
989 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
990
991 for i in 0..10 {
992 wheel.schedule_forget(now + ms(1), i);
993 }
994
995 let mut buf = Vec::new();
996
997 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
999 assert_eq!(fired, 3);
1000 assert_eq!(wheel.len(), 7);
1001
1002 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1003 assert_eq!(fired, 3);
1004 assert_eq!(wheel.len(), 4);
1005
1006 let fired = wheel.poll(now + ms(5), &mut buf);
1008 assert_eq!(fired, 4);
1009 assert!(wheel.is_empty());
1010 assert_eq!(buf.len(), 10);
1011 }
1012
1013 #[test]
1018 fn timers_across_levels() {
1019 let now = Instant::now();
1020 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1021
1022 wheel.schedule_forget(now + ms(5), 0);
1024 wheel.schedule_forget(now + ms(200), 1);
1026 wheel.schedule_forget(now + ms(1000), 2);
1028
1029 let mut buf = Vec::new();
1030
1031 wheel.poll(now + ms(10), &mut buf);
1032 assert_eq!(buf, vec![0]);
1033
1034 buf.clear();
1035 wheel.poll(now + ms(250), &mut buf);
1036 assert_eq!(buf, vec![1]);
1037
1038 buf.clear();
1039 wheel.poll(now + ms(1500), &mut buf);
1040 assert_eq!(buf, vec![2]);
1041
1042 assert!(wheel.is_empty());
1043 }
1044
1045 #[test]
1050 fn next_deadline_empty() {
1051 let now = Instant::now();
1052 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1053 assert!(wheel.next_deadline().is_none());
1054 }
1055
1056 #[test]
1057 fn next_deadline_returns_earliest() {
1058 let now = Instant::now();
1059 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1060
1061 wheel.schedule_forget(now + ms(100), 1);
1062 wheel.schedule_forget(now + ms(50), 2);
1063 wheel.schedule_forget(now + ms(200), 3);
1064
1065 let next = wheel.next_deadline().unwrap();
1066 let delta = next.duration_since(now);
1068 assert!(delta >= ms(49) && delta <= ms(51));
1069 }
1070
1071 #[test]
1076 fn deadline_in_the_past_fires_immediately() {
1077 let now = Instant::now();
1078 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1079
1080 wheel.schedule_forget(now, 42);
1082
1083 let mut buf = Vec::new();
1084 let fired = wheel.poll(now + ms(1), &mut buf);
1085 assert_eq!(fired, 1);
1086 assert_eq!(buf, vec![42]);
1087 }
1088
1089 #[test]
1094 fn deadline_beyond_max_range_clamped() {
1095 let now = Instant::now();
1096 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1097
1098 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1100 assert_eq!(wheel.len(), 1);
1101
1102 let mut buf = Vec::new();
1104 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1105 assert_eq!(buf, vec![99]);
1106
1107 let val = wheel.cancel(h);
1117 assert_eq!(val, None);
1118 }
1119
1120 #[test]
1125 fn drop_cleans_up_active_entries() {
1126 let now = Instant::now();
1127 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1128
1129 for i in 0..100 {
1130 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1131 }
1132
1133 assert_eq!(wheel.len(), 100);
1134 drop(wheel);
1136 }
1137
1138 #[test]
1139 fn drop_with_outstanding_handles() {
1140 let now = Instant::now();
1141 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1142
1143 let h1 = wheel.schedule(now + ms(10), 1);
1145 let h2 = wheel.schedule(now + ms(20), 2);
1146
1147 wheel.free(h1);
1149 wheel.free(h2);
1150
1151 drop(wheel);
1153 }
1154
1155 #[test]
1160 fn level_selection_boundaries() {
1161 let now = Instant::now();
1162 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1163
1164 assert_eq!(wheel.select_level(0), 0);
1166 assert_eq!(wheel.select_level(63), 0);
1167
1168 assert_eq!(wheel.select_level(64), 1);
1170 assert_eq!(wheel.select_level(511), 1);
1171
1172 assert_eq!(wheel.select_level(512), 2);
1174 }
1175
1176 #[test]
1181 fn cancel_after_time_advance() {
1182 let now = Instant::now();
1187 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1188
1189 let h = wheel.schedule(now + ms(500), 42);
1190 assert_eq!(wheel.len(), 1);
1191
1192 let mut buf = Vec::new();
1194 let fired = wheel.poll(now + ms(400), &mut buf);
1195 assert_eq!(fired, 0);
1196 assert!(buf.is_empty());
1197
1198 let val = wheel.cancel(h);
1200 assert_eq!(val, Some(42));
1201 assert_eq!(wheel.len(), 0);
1202 }
1203
1204 #[test]
1209 fn multiple_entries_same_slot() {
1210 let now = Instant::now();
1211 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1212
1213 let mut handles = Vec::new();
1215 for i in 0..5 {
1216 handles.push(wheel.schedule(now + ms(10), i));
1217 }
1218 assert_eq!(wheel.len(), 5);
1219
1220 let v2 = wheel.cancel(handles.remove(2));
1222 assert_eq!(v2, Some(2));
1223 let v0 = wheel.cancel(handles.remove(0));
1224 assert_eq!(v0, Some(0));
1225 assert_eq!(wheel.len(), 3);
1226
1227 let mut buf = Vec::new();
1229 let fired = wheel.poll(now + ms(20), &mut buf);
1230 assert_eq!(fired, 3);
1231
1232 for h in handles {
1234 let val = wheel.cancel(h);
1235 assert_eq!(val, None); }
1237 }
1238
1239 #[test]
1244 fn entry_at_level_boundary() {
1245 let now = Instant::now();
1248 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1249
1250 let h = wheel.schedule(now + ms(64), 99);
1251 assert_eq!(wheel.len(), 1);
1252
1253 let mut buf = Vec::new();
1255 let fired = wheel.poll(now + ms(63), &mut buf);
1256 assert_eq!(fired, 0);
1257
1258 let fired = wheel.poll(now + ms(65), &mut buf);
1260 assert_eq!(fired, 1);
1261 assert_eq!(buf, vec![99]);
1262
1263 wheel.cancel(h);
1265 }
1266
1267 #[test]
1272 fn poll_with_limit_mixed_expiry() {
1273 let now = Instant::now();
1274 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1275
1276 wheel.schedule_forget(now + ms(5), 1);
1278 wheel.schedule_forget(now + ms(5), 2);
1279 wheel.schedule_forget(now + ms(5), 3);
1280 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1283
1284 let mut buf = Vec::new();
1285
1286 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1288 assert_eq!(fired, 2);
1289 assert_eq!(wheel.len(), 3);
1290
1291 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1293 assert_eq!(fired, 1);
1294 assert_eq!(wheel.len(), 2);
1295
1296 assert_eq!(buf.len(), 3);
1298 }
1299
1300 #[test]
1305 fn reuse_after_full_drain() {
1306 let now = Instant::now();
1307 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1308
1309 for i in 0..10 {
1311 wheel.schedule_forget(now + ms(1), i);
1312 }
1313 let mut buf = Vec::new();
1314 wheel.poll(now + ms(5), &mut buf);
1315 assert_eq!(buf.len(), 10);
1316 assert!(wheel.is_empty());
1317
1318 buf.clear();
1320 for i in 10..20 {
1321 wheel.schedule_forget(now + ms(100), i);
1322 }
1323 assert_eq!(wheel.len(), 10);
1324
1325 wheel.poll(now + ms(200), &mut buf);
1326 assert_eq!(buf.len(), 10);
1327 assert!(wheel.is_empty());
1328 }
1329
1330 #[test]
1335 fn all_levels_active() {
1336 let now = Instant::now();
1337 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1338
1339 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1342 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1343 for (i, &d) in distances.iter().enumerate() {
1344 handles.push(wheel.schedule(now + ms(d), i as u64));
1345 }
1346 assert_eq!(wheel.len(), 7);
1347
1348 let order = [4, 1, 6, 0, 3, 5, 2];
1350 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1353 handles.into_iter().map(Some).collect();
1354
1355 for &idx in &order {
1356 let h = opt_handles[idx].take().unwrap();
1357 let val = wheel.cancel(h);
1358 assert_eq!(val, Some(idx as u64));
1359 }
1360 assert!(wheel.is_empty());
1361 }
1362
1363 #[test]
1368 fn poll_values_match() {
1369 let now = Instant::now();
1370 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1371
1372 let expected: Vec<u64> = (100..110).collect();
1373 for &v in &expected {
1374 wheel.schedule_forget(now + ms(5), v);
1375 }
1376
1377 let mut buf = Vec::new();
1378 wheel.poll(now + ms(10), &mut buf);
1379
1380 buf.sort();
1381 assert_eq!(buf, expected);
1382 }
1383
1384 #[test]
1389 fn reschedule_moves_deadline() {
1390 let now = Instant::now();
1391 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1392
1393 let h = wheel.schedule(now + ms(100), 42);
1394 assert_eq!(wheel.len(), 1);
1395
1396 let h = wheel.reschedule(h, now + ms(50));
1398 assert_eq!(wheel.len(), 1);
1399
1400 let mut buf = Vec::new();
1402 let fired = wheel.poll(now + ms(40), &mut buf);
1403 assert_eq!(fired, 0);
1404
1405 let fired = wheel.poll(now + ms(55), &mut buf);
1407 assert_eq!(fired, 1);
1408 assert_eq!(buf, vec![42]);
1409
1410 wheel.cancel(h);
1412 }
1413
1414 #[test]
1415 fn reschedule_to_later() {
1416 let now = Instant::now();
1417 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1418
1419 let h = wheel.schedule(now + ms(50), 7);
1420
1421 let h = wheel.reschedule(h, now + ms(200));
1423
1424 let mut buf = Vec::new();
1426 let fired = wheel.poll(now + ms(60), &mut buf);
1427 assert_eq!(fired, 0);
1428
1429 let fired = wheel.poll(now + ms(210), &mut buf);
1431 assert_eq!(fired, 1);
1432 assert_eq!(buf, vec![7]);
1433
1434 wheel.cancel(h);
1435 }
1436
1437 #[test]
1438 #[should_panic(expected = "cannot reschedule a fired timer")]
1439 fn reschedule_panics_on_zombie() {
1440 let now = Instant::now();
1441 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1442
1443 let h = wheel.schedule(now + ms(10), 42);
1444
1445 let mut buf = Vec::new();
1446 wheel.poll(now + ms(20), &mut buf);
1447
1448 let _h = wheel.reschedule(h, now + ms(100));
1450 }
1451}
1452
1453#[cfg(test)]
1454mod proptests {
1455 use super::*;
1456 use proptest::prelude::*;
1457 use std::collections::HashSet;
1458 use std::mem;
1459 use std::time::{Duration, Instant};
1460
1461 #[derive(Debug, Clone)]
1463 enum Op {
1464 Schedule { deadline_ms: u64 },
1466 Cancel { idx: usize },
1468 }
1469
1470 fn op_strategy() -> impl Strategy<Value = Op> {
1471 prop_oneof![
1472 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1474 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1476 ]
1477 }
1478
1479 proptest! {
1480 #![proptest_config(ProptestConfig::with_cases(500))]
1481
1482 #[test]
1489 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1490 let now = Instant::now();
1491 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1492
1493 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1494 let mut active_values: HashSet<u64> = HashSet::new();
1495 let mut next_id: u64 = 0;
1496
1497 for op in &ops {
1498 match op {
1499 Op::Schedule { deadline_ms } => {
1500 let id = next_id;
1501 next_id += 1;
1502 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1503 handles.push(h);
1504 active_values.insert(id);
1505 }
1506 Op::Cancel { idx } => {
1507 if !handles.is_empty() {
1508 let i = idx % handles.len();
1509 let h = handles.swap_remove(i);
1510 let val = wheel.cancel(h);
1511 let v = val.unwrap();
1513 assert!(active_values.remove(&v));
1514 }
1515 }
1516 }
1517 prop_assert_eq!(wheel.len(), active_values.len());
1519 }
1520
1521 let mut buf = Vec::new();
1523 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1525
1526 for h in handles {
1528 mem::forget(h);
1529 }
1530
1531 let fired_set: HashSet<u64> = buf.into_iter().collect();
1532 prop_assert_eq!(fired_set, active_values);
1533 prop_assert!(wheel.is_empty());
1534 }
1535
1536 #[test]
1542 fn fuzz_poll_timing(
1543 deadlines in proptest::collection::vec(1u64..5000, 1..100),
1544 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1545 ) {
1546 let now = Instant::now();
1547 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1548
1549 for (i, &d) in deadlines.iter().enumerate() {
1551 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1552 }
1553
1554 let mut sorted_times: Vec<u64> = poll_times;
1556 sorted_times.sort();
1557 sorted_times.dedup();
1558
1559 let mut all_fired: Vec<u64> = Vec::new();
1560
1561 for &t in &sorted_times {
1562 let mut buf = Vec::new();
1563 wheel.poll(now + Duration::from_millis(t), &mut buf);
1564
1565 for &id in &buf {
1567 let deadline_ms = deadlines[id as usize];
1568 prop_assert!(deadline_ms <= t,
1569 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1570 }
1571
1572 all_fired.extend(buf);
1573 }
1574
1575 let mut final_buf = Vec::new();
1577 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1578 all_fired.extend(final_buf);
1579
1580 all_fired.sort();
1582 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1583 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1584 prop_assert!(wheel.is_empty());
1585 }
1586 }
1587}