1use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, Slot, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48 tick_duration: Duration,
49 slots_per_level: usize,
50 clk_shift: u32,
51 num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55 fn default() -> Self {
56 WheelBuilder {
57 tick_duration: Duration::from_millis(1),
58 slots_per_level: 64,
59 clk_shift: 3,
60 num_levels: 7,
61 }
62 }
63}
64
65impl WheelBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn tick_duration(mut self, d: Duration) -> Self {
73 self.tick_duration = d;
74 self
75 }
76
77 pub fn slots_per_level(mut self, n: usize) -> Self {
79 self.slots_per_level = n;
80 self
81 }
82
83 pub fn clk_shift(mut self, s: u32) -> Self {
85 self.clk_shift = s;
86 self
87 }
88
89 pub fn num_levels(mut self, n: usize) -> Self {
91 self.num_levels = n;
92 self
93 }
94
95 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100 UnboundedWheelBuilder {
101 config: self,
102 chunk_capacity,
103 }
104 }
105
106 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110 BoundedWheelBuilder {
111 config: self,
112 capacity,
113 }
114 }
115
116 fn validate(&self) {
117 assert!(
118 self.slots_per_level.is_power_of_two(),
119 "slots_per_level must be a power of 2, got {}",
120 self.slots_per_level
121 );
122 assert!(
123 self.slots_per_level <= 64,
124 "slots_per_level must be <= 64 (u64 bitmask), got {}",
125 self.slots_per_level
126 );
127 assert!(self.num_levels > 0, "num_levels must be > 0");
128 assert!(
129 self.num_levels <= 8,
130 "num_levels must be <= 8 (u8 bitmask), got {}",
131 self.num_levels
132 );
133 assert!(self.clk_shift > 0, "clk_shift must be > 0");
134 assert!(
135 !self.tick_duration.is_zero(),
136 "tick_duration must be non-zero"
137 );
138 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139 assert!(
140 max_shift < 64,
141 "(num_levels - 1) * clk_shift must be < 64, got {}",
142 max_shift
143 );
144 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145 assert!(
146 slots_log2 + max_shift < 64,
147 "slots_per_level << max_shift would overflow u64"
148 );
149 }
150
151 fn tick_ns(&self) -> u64 {
152 self.tick_duration.as_nanos() as u64
153 }
154}
155
156#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161 config: WheelBuilder,
162 chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173 self.config.validate();
174 let slab = unsafe { unbounded::Slab::with_chunk_capacity(self.chunk_capacity) };
178 let levels = build_levels::<T>(&self.config);
179 TimerWheel {
180 slab,
181 num_levels: self.config.num_levels,
182 levels,
183 current_ticks: 0,
184 tick_ns: self.config.tick_ns(),
185 epoch: now,
186 active_levels: 0,
187 len: 0,
188 _marker: PhantomData,
189 }
190 }
191}
192
193#[derive(Debug)]
197pub struct BoundedWheelBuilder {
198 config: WheelBuilder,
199 capacity: usize,
200}
201
202impl BoundedWheelBuilder {
203 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
210 self.config.validate();
211 let slab = unsafe { bounded::Slab::with_capacity(self.capacity) };
215 let levels = build_levels::<T>(&self.config);
216 TimerWheel {
217 slab,
218 num_levels: self.config.num_levels,
219 levels,
220 current_ticks: 0,
221 tick_ns: self.config.tick_ns(),
222 epoch: now,
223 active_levels: 0,
224 len: 0,
225 _marker: PhantomData,
226 }
227 }
228}
229
230pub struct TimerWheel<
247 T: 'static,
248 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
249> {
250 slab: S,
251 levels: Vec<Level<T>>,
252 num_levels: usize,
253 active_levels: u8,
254 current_ticks: u64,
255 tick_ns: u64,
256 epoch: Instant,
257 len: usize,
258 _marker: PhantomData<*const ()>, }
260
261#[allow(clippy::non_send_fields_in_send_ty)]
289unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
290
291pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
293
294pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
296
297impl<T: 'static> Wheel<T> {
302 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
306 WheelBuilder::default().unbounded(chunk_capacity).build(now)
307 }
308}
309
310impl<T: 'static> BoundedWheel<T> {
311 pub fn bounded(capacity: usize, now: Instant) -> Self {
315 WheelBuilder::default().bounded(capacity).build(now)
316 }
317}
318
319fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
320 (0..config.num_levels)
321 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
322 .collect()
323}
324
325impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
330 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
340 let deadline_ticks = self.instant_to_ticks(deadline);
341 let entry = WheelEntry::new(deadline_ticks, value, 2);
342 let slot = self.slab.alloc(entry);
343 let ptr = slot.into_raw();
344 self.insert_entry(ptr, deadline_ticks);
345 self.len += 1;
346 TimerHandle::new(ptr)
347 }
348
349 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
359 let deadline_ticks = self.instant_to_ticks(deadline);
360 let entry = WheelEntry::new(deadline_ticks, value, 1);
361 let slot = self.slab.alloc(entry);
362 let ptr = slot.into_raw();
363 self.insert_entry(ptr, deadline_ticks);
364 self.len += 1;
365 }
366}
367
368impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
373 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
379 let deadline_ticks = self.instant_to_ticks(deadline);
380 let entry = WheelEntry::new(deadline_ticks, value, 2);
381 match self.slab.try_alloc(entry) {
382 Ok(slot) => {
383 let ptr = slot.into_raw();
384 self.insert_entry(ptr, deadline_ticks);
385 self.len += 1;
386 Ok(TimerHandle::new(ptr))
387 }
388 Err(full) => {
389 let wheel_entry = full.into_inner();
392 let value = unsafe { wheel_entry.take_value() }
393 .expect("entry was just constructed with Some(value)");
394 Err(Full(value))
395 }
396 }
397 }
398
399 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
405 let deadline_ticks = self.instant_to_ticks(deadline);
406 let entry = WheelEntry::new(deadline_ticks, value, 1);
407 match self.slab.try_alloc(entry) {
408 Ok(slot) => {
409 let ptr = slot.into_raw();
410 self.insert_entry(ptr, deadline_ticks);
411 self.len += 1;
412 Ok(())
413 }
414 Err(full) => {
415 let wheel_entry = full.into_inner();
416 let value = unsafe { wheel_entry.take_value() }
417 .expect("entry was just constructed with Some(value)");
418 Err(Full(value))
419 }
420 }
421 }
422}
423
424impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
429 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
438 let ptr = handle.ptr;
439 mem::forget(handle);
441
442 let entry = unsafe { entry_ref(ptr) };
444 let refs = entry.refs();
445
446 if refs == 2 {
447 let value = unsafe { entry.take_value() };
449 self.remove_entry(ptr);
450 self.len -= 1;
451 self.slab.free(unsafe { Slot::from_raw(ptr) });
453 value
454 } else {
455 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
458 self.slab.free(unsafe { Slot::from_raw(ptr) });
460 None
461 }
462 }
463
464 pub fn free(&mut self, handle: TimerHandle<T>) {
472 let ptr = handle.ptr;
473 mem::forget(handle);
474
475 let entry = unsafe { entry_ref(ptr) };
477 let new_refs = entry.dec_refs();
478
479 if new_refs == 0 {
480 self.slab.free(unsafe { Slot::from_raw(ptr) });
483 }
484 }
486
487 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
499 let ptr = handle.ptr;
500 mem::forget(handle);
501
502 let entry = unsafe { entry_ref(ptr) };
504 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
505
506 self.remove_entry(ptr);
508
509 let new_ticks = self.instant_to_ticks(new_deadline);
511 entry.set_deadline_ticks(new_ticks);
512 self.insert_entry(ptr, new_ticks);
513
514 TimerHandle::new(ptr)
515 }
516
517 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
521 self.poll_with_limit(now, usize::MAX, buf)
522 }
523
524 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
531 let now_ticks = self.instant_to_ticks(now);
532 self.current_ticks = now_ticks;
533
534 let mut fired = 0;
535 let mut mask = self.active_levels;
536
537 while mask != 0 && fired < limit {
538 let lvl_idx = mask.trailing_zeros() as usize;
539 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
541 }
542 fired
543 }
544
545 pub fn next_deadline(&self) -> Option<Instant> {
550 let mut min_ticks: Option<u64> = None;
551
552 let mut lvl_mask = self.active_levels;
553 while lvl_mask != 0 {
554 let lvl_idx = lvl_mask.trailing_zeros() as usize;
555 lvl_mask &= lvl_mask - 1;
556
557 let level = &self.levels[lvl_idx];
558 let mut slot_mask = level.active_slots();
559 while slot_mask != 0 {
560 let slot_idx = slot_mask.trailing_zeros() as usize;
561 slot_mask &= slot_mask - 1;
562
563 let slot = level.slot(slot_idx);
564 let mut entry_ptr = slot.entry_head();
565
566 while !entry_ptr.is_null() {
567 let entry = unsafe { entry_ref(entry_ptr) };
569 let dt = entry.deadline_ticks();
570 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
571 entry_ptr = entry.next();
572 }
573 }
574 }
575
576 min_ticks.map(|t| self.ticks_to_instant(t))
577 }
578
579 #[inline]
581 pub fn len(&self) -> usize {
582 self.len
583 }
584
585 #[inline]
587 pub fn is_empty(&self) -> bool {
588 self.len == 0
589 }
590
591 #[inline]
596 fn instant_to_ticks(&self, instant: Instant) -> u64 {
597 let dur = instant.saturating_duration_since(self.epoch);
599 dur.as_nanos() as u64 / self.tick_ns
600 }
601
602 #[inline]
603 fn ticks_to_instant(&self, ticks: u64) -> Instant {
604 self.epoch + Duration::from_nanos(ticks * self.tick_ns)
605 }
606
607 #[inline]
617 fn select_level(&self, deadline_ticks: u64) -> usize {
618 let delta = deadline_ticks.saturating_sub(self.current_ticks);
619
620 for (i, level) in self.levels.iter().enumerate() {
621 if delta < level.range() {
622 return i;
623 }
624 }
625
626 self.num_levels - 1
628 }
629
630 #[inline]
639 #[allow(clippy::needless_pass_by_ref_mut)]
640 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
641 let lvl_idx = self.select_level(deadline_ticks);
642 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
643
644 let entry = unsafe { entry_ref(entry_ptr) };
647 entry.set_location(lvl_idx as u8, slot_idx as u16);
648
649 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
651
652 self.levels[lvl_idx].activate_slot(slot_idx);
654 self.active_levels |= 1 << lvl_idx;
655 }
656
657 #[inline]
663 #[allow(clippy::needless_pass_by_ref_mut)]
664 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
665 let entry = unsafe { entry_ref(entry_ptr) };
667
668 let lvl_idx = entry.level() as usize;
669 let slot_idx = entry.slot_idx() as usize;
670
671 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
673
674 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
675 self.levels[lvl_idx].deactivate_slot(slot_idx);
676 if !self.levels[lvl_idx].is_active() {
677 self.active_levels &= !(1 << lvl_idx);
678 }
679 }
680 }
681
682 #[inline]
690 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
691 let entry = unsafe { entry_ref(entry_ptr) };
693
694 let value = unsafe { entry.take_value() };
697
698 let new_refs = entry.dec_refs();
699 if new_refs == 0 {
700 self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
703 }
704 self.len -= 1;
708 value
709 }
710
711 fn poll_level(
718 &mut self,
719 lvl_idx: usize,
720 now_ticks: u64,
721 limit: usize,
722 buf: &mut Vec<T>,
723 ) -> usize {
724 let mut fired = 0;
725 let mut mask = self.levels[lvl_idx].active_slots();
726
727 while mask != 0 && fired < limit {
728 let slot_idx = mask.trailing_zeros() as usize;
729 mask &= mask - 1;
730
731 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
732 let slot = unsafe { &*slot_ptr };
736 let mut entry_ptr = slot.entry_head();
737
738 while !entry_ptr.is_null() && fired < limit {
739 let entry = unsafe { entry_ref(entry_ptr) };
741 let next_entry = entry.next();
742
743 if entry.deadline_ticks() <= now_ticks {
744 unsafe { slot.remove_entry(entry_ptr) };
745
746 if let Some(value) = self.fire_entry(entry_ptr) {
747 buf.push(value);
748 }
749 fired += 1;
750 }
751
752 entry_ptr = next_entry;
753 }
754
755 if slot.is_empty() {
756 self.levels[lvl_idx].deactivate_slot(slot_idx);
757 }
758 }
759
760 if !self.levels[lvl_idx].is_active() {
762 self.active_levels &= !(1 << lvl_idx);
763 }
764
765 fired
766 }
767}
768
769impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
774 fn drop(&mut self) {
775 let mut lvl_mask = self.active_levels;
777 while lvl_mask != 0 {
778 let lvl_idx = lvl_mask.trailing_zeros() as usize;
779 lvl_mask &= lvl_mask - 1;
780
781 let level = &self.levels[lvl_idx];
782 let mut slot_mask = level.active_slots();
783 while slot_mask != 0 {
784 let slot_idx = slot_mask.trailing_zeros() as usize;
785 slot_mask &= slot_mask - 1;
786
787 let slot = level.slot(slot_idx);
788 let mut entry_ptr = slot.entry_head();
789 while !entry_ptr.is_null() {
790 let entry = unsafe { entry_ref(entry_ptr) };
792 let next_entry = entry.next();
793
794 self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
796
797 entry_ptr = next_entry;
798 }
799 }
800 }
801 }
802}
803
804#[cfg(test)]
809mod tests {
810 use super::*;
811 use std::time::{Duration, Instant};
812
813 fn ms(millis: u64) -> Duration {
814 Duration::from_millis(millis)
815 }
816
817 fn assert_send<T: Send>() {}
822
823 #[test]
824 fn wheel_is_send() {
825 assert_send::<Wheel<u64>>();
826 assert_send::<BoundedWheel<u64>>();
827 }
828
829 #[test]
834 fn default_config() {
835 let now = Instant::now();
836 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
837 assert!(wheel.is_empty());
838 assert_eq!(wheel.len(), 0);
839 }
840
841 #[test]
842 fn bounded_construction() {
843 let now = Instant::now();
844 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
845 assert!(wheel.is_empty());
846 }
847
848 #[test]
849 #[should_panic(expected = "slots_per_level must be a power of 2")]
850 fn invalid_config_non_power_of_two() {
851 let now = Instant::now();
852 WheelBuilder::default()
853 .slots_per_level(65)
854 .unbounded(1024)
855 .build::<u64>(now);
856 }
857
858 #[test]
863 fn schedule_and_cancel() {
864 let now = Instant::now();
865 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
866
867 let h = wheel.schedule(now + ms(50), 42);
868 assert_eq!(wheel.len(), 1);
869
870 let val = wheel.cancel(h);
871 assert_eq!(val, Some(42));
872 assert_eq!(wheel.len(), 0);
873 }
874
875 #[test]
876 fn schedule_forget_fires() {
877 let now = Instant::now();
878 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
879
880 wheel.schedule_forget(now + ms(10), 99);
881 assert_eq!(wheel.len(), 1);
882
883 let mut buf = Vec::new();
884 let fired = wheel.poll(now + ms(20), &mut buf);
885 assert_eq!(fired, 1);
886 assert_eq!(buf, vec![99]);
887 assert_eq!(wheel.len(), 0);
888 }
889
890 #[test]
891 fn cancel_after_fire_returns_none() {
892 let now = Instant::now();
893 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
894
895 let h = wheel.schedule(now + ms(10), 42);
896
897 let mut buf = Vec::new();
898 wheel.poll(now + ms(20), &mut buf);
899 assert_eq!(buf, vec![42]);
900
901 let val = wheel.cancel(h);
903 assert_eq!(val, None);
904 }
905
906 #[test]
907 fn free_active_timer_becomes_fire_and_forget() {
908 let now = Instant::now();
909 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
910
911 let h = wheel.schedule(now + ms(10), 42);
912 wheel.free(h); assert_eq!(wheel.len(), 1);
914
915 let mut buf = Vec::new();
916 wheel.poll(now + ms(20), &mut buf);
917 assert_eq!(buf, vec![42]);
918 assert_eq!(wheel.len(), 0);
919 }
920
921 #[test]
922 fn free_zombie_handle() {
923 let now = Instant::now();
924 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
925
926 let h = wheel.schedule(now + ms(10), 42);
927
928 let mut buf = Vec::new();
929 wheel.poll(now + ms(20), &mut buf);
930
931 wheel.free(h);
933 }
934
935 #[test]
940 fn bounded_full() {
941 let now = Instant::now();
942 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
943
944 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
945 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
946
947 let err = wheel.try_schedule(now + ms(30), 3);
948 assert!(err.is_err());
949 let recovered = err.unwrap_err().into_inner();
950 assert_eq!(recovered, 3);
951
952 wheel.cancel(h1);
954 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
955
956 wheel.free(h2);
958 wheel.free(h3);
959 }
960
961 #[test]
962 fn bounded_schedule_forget_full() {
963 let now = Instant::now();
964 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
965
966 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
967 let err = wheel.try_schedule_forget(now + ms(20), 2);
968 assert!(err.is_err());
969 }
970
971 #[test]
976 fn poll_respects_deadline() {
977 let now = Instant::now();
978 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
979
980 wheel.schedule_forget(now + ms(10), 1);
981 wheel.schedule_forget(now + ms(50), 2);
982 wheel.schedule_forget(now + ms(100), 3);
983
984 let mut buf = Vec::new();
985
986 let fired = wheel.poll(now + ms(20), &mut buf);
988 assert_eq!(fired, 1);
989 assert_eq!(buf, vec![1]);
990 assert_eq!(wheel.len(), 2);
991
992 buf.clear();
994 let fired = wheel.poll(now + ms(60), &mut buf);
995 assert_eq!(fired, 1);
996 assert_eq!(buf, vec![2]);
997
998 buf.clear();
1000 let fired = wheel.poll(now + ms(200), &mut buf);
1001 assert_eq!(fired, 1);
1002 assert_eq!(buf, vec![3]);
1003
1004 assert!(wheel.is_empty());
1005 }
1006
1007 #[test]
1008 fn poll_with_limit() {
1009 let now = Instant::now();
1010 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1011
1012 for i in 0..10 {
1013 wheel.schedule_forget(now + ms(1), i);
1014 }
1015
1016 let mut buf = Vec::new();
1017
1018 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1020 assert_eq!(fired, 3);
1021 assert_eq!(wheel.len(), 7);
1022
1023 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1024 assert_eq!(fired, 3);
1025 assert_eq!(wheel.len(), 4);
1026
1027 let fired = wheel.poll(now + ms(5), &mut buf);
1029 assert_eq!(fired, 4);
1030 assert!(wheel.is_empty());
1031 assert_eq!(buf.len(), 10);
1032 }
1033
1034 #[test]
1039 fn timers_across_levels() {
1040 let now = Instant::now();
1041 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1042
1043 wheel.schedule_forget(now + ms(5), 0);
1045 wheel.schedule_forget(now + ms(200), 1);
1047 wheel.schedule_forget(now + ms(1000), 2);
1049
1050 let mut buf = Vec::new();
1051
1052 wheel.poll(now + ms(10), &mut buf);
1053 assert_eq!(buf, vec![0]);
1054
1055 buf.clear();
1056 wheel.poll(now + ms(250), &mut buf);
1057 assert_eq!(buf, vec![1]);
1058
1059 buf.clear();
1060 wheel.poll(now + ms(1500), &mut buf);
1061 assert_eq!(buf, vec![2]);
1062
1063 assert!(wheel.is_empty());
1064 }
1065
1066 #[test]
1071 fn next_deadline_empty() {
1072 let now = Instant::now();
1073 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1074 assert!(wheel.next_deadline().is_none());
1075 }
1076
1077 #[test]
1078 fn next_deadline_returns_earliest() {
1079 let now = Instant::now();
1080 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1081
1082 wheel.schedule_forget(now + ms(100), 1);
1083 wheel.schedule_forget(now + ms(50), 2);
1084 wheel.schedule_forget(now + ms(200), 3);
1085
1086 let next = wheel.next_deadline().unwrap();
1087 let delta = next.duration_since(now);
1089 assert!(delta >= ms(49) && delta <= ms(51));
1090 }
1091
1092 #[test]
1097 fn deadline_in_the_past_fires_immediately() {
1098 let now = Instant::now();
1099 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1100
1101 wheel.schedule_forget(now, 42);
1103
1104 let mut buf = Vec::new();
1105 let fired = wheel.poll(now + ms(1), &mut buf);
1106 assert_eq!(fired, 1);
1107 assert_eq!(buf, vec![42]);
1108 }
1109
1110 #[test]
1115 fn deadline_beyond_max_range_clamped() {
1116 let now = Instant::now();
1117 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1118
1119 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1121 assert_eq!(wheel.len(), 1);
1122
1123 let mut buf = Vec::new();
1125 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1126 assert_eq!(buf, vec![99]);
1127
1128 let val = wheel.cancel(h);
1138 assert_eq!(val, None);
1139 }
1140
1141 #[test]
1146 fn drop_cleans_up_active_entries() {
1147 let now = Instant::now();
1148 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1149
1150 for i in 0..100 {
1151 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1152 }
1153
1154 assert_eq!(wheel.len(), 100);
1155 drop(wheel);
1157 }
1158
1159 #[test]
1160 fn drop_with_outstanding_handles() {
1161 let now = Instant::now();
1162 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1163
1164 let h1 = wheel.schedule(now + ms(10), 1);
1166 let h2 = wheel.schedule(now + ms(20), 2);
1167
1168 wheel.free(h1);
1170 wheel.free(h2);
1171
1172 drop(wheel);
1174 }
1175
1176 #[test]
1181 fn level_selection_boundaries() {
1182 let now = Instant::now();
1183 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1184
1185 assert_eq!(wheel.select_level(0), 0);
1187 assert_eq!(wheel.select_level(63), 0);
1188
1189 assert_eq!(wheel.select_level(64), 1);
1191 assert_eq!(wheel.select_level(511), 1);
1192
1193 assert_eq!(wheel.select_level(512), 2);
1195 }
1196
1197 #[test]
1202 fn cancel_after_time_advance() {
1203 let now = Instant::now();
1208 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1209
1210 let h = wheel.schedule(now + ms(500), 42);
1211 assert_eq!(wheel.len(), 1);
1212
1213 let mut buf = Vec::new();
1215 let fired = wheel.poll(now + ms(400), &mut buf);
1216 assert_eq!(fired, 0);
1217 assert!(buf.is_empty());
1218
1219 let val = wheel.cancel(h);
1221 assert_eq!(val, Some(42));
1222 assert_eq!(wheel.len(), 0);
1223 }
1224
1225 #[test]
1230 fn multiple_entries_same_slot() {
1231 let now = Instant::now();
1232 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1233
1234 let mut handles = Vec::new();
1236 for i in 0..5 {
1237 handles.push(wheel.schedule(now + ms(10), i));
1238 }
1239 assert_eq!(wheel.len(), 5);
1240
1241 let v2 = wheel.cancel(handles.remove(2));
1243 assert_eq!(v2, Some(2));
1244 let v0 = wheel.cancel(handles.remove(0));
1245 assert_eq!(v0, Some(0));
1246 assert_eq!(wheel.len(), 3);
1247
1248 let mut buf = Vec::new();
1250 let fired = wheel.poll(now + ms(20), &mut buf);
1251 assert_eq!(fired, 3);
1252
1253 for h in handles {
1255 let val = wheel.cancel(h);
1256 assert_eq!(val, None); }
1258 }
1259
1260 #[test]
1265 fn entry_at_level_boundary() {
1266 let now = Instant::now();
1269 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1270
1271 let h = wheel.schedule(now + ms(64), 99);
1272 assert_eq!(wheel.len(), 1);
1273
1274 let mut buf = Vec::new();
1276 let fired = wheel.poll(now + ms(63), &mut buf);
1277 assert_eq!(fired, 0);
1278
1279 let fired = wheel.poll(now + ms(65), &mut buf);
1281 assert_eq!(fired, 1);
1282 assert_eq!(buf, vec![99]);
1283
1284 wheel.cancel(h);
1286 }
1287
1288 #[test]
1293 fn poll_with_limit_mixed_expiry() {
1294 let now = Instant::now();
1295 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1296
1297 wheel.schedule_forget(now + ms(5), 1);
1299 wheel.schedule_forget(now + ms(5), 2);
1300 wheel.schedule_forget(now + ms(5), 3);
1301 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1304
1305 let mut buf = Vec::new();
1306
1307 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1309 assert_eq!(fired, 2);
1310 assert_eq!(wheel.len(), 3);
1311
1312 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1314 assert_eq!(fired, 1);
1315 assert_eq!(wheel.len(), 2);
1316
1317 assert_eq!(buf.len(), 3);
1319 }
1320
1321 #[test]
1326 fn reuse_after_full_drain() {
1327 let now = Instant::now();
1328 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1329
1330 for i in 0..10 {
1332 wheel.schedule_forget(now + ms(1), i);
1333 }
1334 let mut buf = Vec::new();
1335 wheel.poll(now + ms(5), &mut buf);
1336 assert_eq!(buf.len(), 10);
1337 assert!(wheel.is_empty());
1338
1339 buf.clear();
1341 for i in 10..20 {
1342 wheel.schedule_forget(now + ms(100), i);
1343 }
1344 assert_eq!(wheel.len(), 10);
1345
1346 wheel.poll(now + ms(200), &mut buf);
1347 assert_eq!(buf.len(), 10);
1348 assert!(wheel.is_empty());
1349 }
1350
1351 #[test]
1356 fn all_levels_active() {
1357 let now = Instant::now();
1358 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1359
1360 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1363 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1364 for (i, &d) in distances.iter().enumerate() {
1365 handles.push(wheel.schedule(now + ms(d), i as u64));
1366 }
1367 assert_eq!(wheel.len(), 7);
1368
1369 let order = [4, 1, 6, 0, 3, 5, 2];
1371 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1374 handles.into_iter().map(Some).collect();
1375
1376 for &idx in &order {
1377 let h = opt_handles[idx].take().unwrap();
1378 let val = wheel.cancel(h);
1379 assert_eq!(val, Some(idx as u64));
1380 }
1381 assert!(wheel.is_empty());
1382 }
1383
1384 #[test]
1389 fn poll_values_match() {
1390 let now = Instant::now();
1391 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1392
1393 let expected: Vec<u64> = (100..110).collect();
1394 for &v in &expected {
1395 wheel.schedule_forget(now + ms(5), v);
1396 }
1397
1398 let mut buf = Vec::new();
1399 wheel.poll(now + ms(10), &mut buf);
1400
1401 buf.sort_unstable();
1402 assert_eq!(buf, expected);
1403 }
1404
1405 #[test]
1410 fn reschedule_moves_deadline() {
1411 let now = Instant::now();
1412 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1413
1414 let h = wheel.schedule(now + ms(100), 42);
1415 assert_eq!(wheel.len(), 1);
1416
1417 let h = wheel.reschedule(h, now + ms(50));
1419 assert_eq!(wheel.len(), 1);
1420
1421 let mut buf = Vec::new();
1423 let fired = wheel.poll(now + ms(40), &mut buf);
1424 assert_eq!(fired, 0);
1425
1426 let fired = wheel.poll(now + ms(55), &mut buf);
1428 assert_eq!(fired, 1);
1429 assert_eq!(buf, vec![42]);
1430
1431 wheel.cancel(h);
1433 }
1434
1435 #[test]
1436 fn reschedule_to_later() {
1437 let now = Instant::now();
1438 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1439
1440 let h = wheel.schedule(now + ms(50), 7);
1441
1442 let h = wheel.reschedule(h, now + ms(200));
1444
1445 let mut buf = Vec::new();
1447 let fired = wheel.poll(now + ms(60), &mut buf);
1448 assert_eq!(fired, 0);
1449
1450 let fired = wheel.poll(now + ms(210), &mut buf);
1452 assert_eq!(fired, 1);
1453 assert_eq!(buf, vec![7]);
1454
1455 wheel.cancel(h);
1456 }
1457
1458 #[test]
1459 #[should_panic(expected = "cannot reschedule a fired timer")]
1460 fn reschedule_panics_on_zombie() {
1461 let now = Instant::now();
1462 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1463
1464 let h = wheel.schedule(now + ms(10), 42);
1465
1466 let mut buf = Vec::new();
1467 wheel.poll(now + ms(20), &mut buf);
1468
1469 let _h = wheel.reschedule(h, now + ms(100));
1471 }
1472
1473 #[test]
1478 fn custom_slots_per_level() {
1479 let now = Instant::now();
1480 let mut wheel: Wheel<u64> = WheelBuilder::default()
1482 .slots_per_level(32)
1483 .unbounded(256)
1484 .build(now);
1485
1486 let h1 = wheel.schedule(now + ms(20), 1);
1489 let h2 = wheel.schedule(now + ms(40), 2);
1491
1492 let mut buf = Vec::new();
1493 wheel.poll(now + ms(25), &mut buf);
1494 assert_eq!(buf, vec![1]);
1495
1496 buf.clear();
1497 wheel.poll(now + ms(50), &mut buf);
1498 assert_eq!(buf, vec![2]);
1499
1500 wheel.cancel(h1);
1501 wheel.cancel(h2);
1502 }
1503
1504 #[test]
1505 fn custom_clk_shift() {
1506 let now = Instant::now();
1507 let mut wheel: Wheel<u64> = WheelBuilder::default()
1509 .clk_shift(2)
1510 .unbounded(256)
1511 .build(now);
1512
1513 let h1 = wheel.schedule(now + ms(50), 1); let h2 = wheel.schedule(now + ms(100), 2); let mut buf = Vec::new();
1519 wheel.poll(now + ms(55), &mut buf);
1520 assert_eq!(buf, vec![1]);
1521
1522 buf.clear();
1523 wheel.poll(now + ms(110), &mut buf);
1524 assert_eq!(buf, vec![2]);
1525
1526 wheel.cancel(h1);
1527 wheel.cancel(h2);
1528 }
1529
1530 #[test]
1531 fn custom_num_levels() {
1532 let now = Instant::now();
1533 let mut wheel: Wheel<u64> = WheelBuilder::default()
1535 .num_levels(3)
1536 .unbounded(256)
1537 .build(now);
1538
1539 let h = wheel.schedule(now + ms(3000), 42);
1542 assert_eq!(wheel.len(), 1);
1543
1544 let mut buf = Vec::new();
1545 wheel.poll(now + ms(3100), &mut buf);
1546 assert_eq!(buf, vec![42]);
1547
1548 wheel.cancel(h);
1549 }
1550
1551 #[test]
1552 fn custom_tick_duration() {
1553 let now = Instant::now();
1554 let mut wheel: Wheel<u64> = WheelBuilder::default()
1556 .tick_duration(Duration::from_micros(100))
1557 .unbounded(256)
1558 .build(now);
1559
1560 wheel.schedule_forget(now + ms(1), 1);
1562 wheel.schedule_forget(now + ms(10), 2);
1564
1565 let mut buf = Vec::new();
1566 wheel.poll(now + ms(2), &mut buf);
1567 assert_eq!(buf, vec![1]);
1568
1569 buf.clear();
1570 wheel.poll(now + ms(15), &mut buf);
1571 assert_eq!(buf, vec![2]);
1572 }
1573
1574 #[test]
1575 fn bounded_custom_config() {
1576 let now = Instant::now();
1577 let mut wheel: BoundedWheel<u64> = WheelBuilder::default()
1578 .slots_per_level(16)
1579 .num_levels(4)
1580 .bounded(8)
1581 .build(now);
1582
1583 let mut handles = Vec::new();
1585 for i in 0..8 {
1586 handles.push(wheel.try_schedule(now + ms(i * 10 + 10), i).unwrap());
1587 }
1588 assert!(wheel.try_schedule(now + ms(100), 99).is_err());
1589
1590 wheel.cancel(handles.remove(0));
1592 let h = wheel.try_schedule(now + ms(100), 99).unwrap();
1593 handles.push(h);
1594
1595 for h in handles {
1597 wheel.cancel(h);
1598 }
1599 }
1600
1601 #[test]
1606 #[should_panic(expected = "slots_per_level must be <= 64")]
1607 fn invalid_config_too_many_slots() {
1608 let now = Instant::now();
1609 WheelBuilder::default()
1610 .slots_per_level(128)
1611 .unbounded(1024)
1612 .build::<u64>(now);
1613 }
1614
1615 #[test]
1616 #[should_panic(expected = "num_levels must be > 0")]
1617 fn invalid_config_zero_levels() {
1618 let now = Instant::now();
1619 WheelBuilder::default()
1620 .num_levels(0)
1621 .unbounded(1024)
1622 .build::<u64>(now);
1623 }
1624
1625 #[test]
1626 #[should_panic(expected = "num_levels must be <= 8")]
1627 fn invalid_config_too_many_levels() {
1628 let now = Instant::now();
1629 WheelBuilder::default()
1630 .num_levels(9)
1631 .unbounded(1024)
1632 .build::<u64>(now);
1633 }
1634
1635 #[test]
1636 #[should_panic(expected = "clk_shift must be > 0")]
1637 fn invalid_config_zero_shift() {
1638 let now = Instant::now();
1639 WheelBuilder::default()
1640 .clk_shift(0)
1641 .unbounded(1024)
1642 .build::<u64>(now);
1643 }
1644
1645 #[test]
1646 #[should_panic(expected = "tick_duration must be non-zero")]
1647 fn invalid_config_zero_tick() {
1648 let now = Instant::now();
1649 WheelBuilder::default()
1650 .tick_duration(Duration::ZERO)
1651 .unbounded(1024)
1652 .build::<u64>(now);
1653 }
1654
1655 #[test]
1656 #[should_panic(expected = "overflow")]
1657 fn invalid_config_shift_overflow() {
1658 let now = Instant::now();
1659 WheelBuilder::default()
1663 .num_levels(8)
1664 .clk_shift(9)
1665 .unbounded(1024)
1666 .build::<u64>(now);
1667 }
1668
1669 #[test]
1676 fn miri_schedule_cancel_drop_type() {
1677 let now = Instant::now();
1678 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1679
1680 let h = wheel.schedule(now + ms(50), "hello".to_string());
1681 let val = wheel.cancel(h);
1682 assert_eq!(val, Some("hello".to_string()));
1683 assert!(wheel.is_empty());
1684 }
1685
1686 #[test]
1687 fn miri_poll_fires_drop_type() {
1688 let now = Instant::now();
1689 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1690
1691 wheel.schedule_forget(now + ms(10), "a".to_string());
1692 wheel.schedule_forget(now + ms(10), "b".to_string());
1693 wheel.schedule_forget(now + ms(10), "c".to_string());
1694
1695 let mut buf = Vec::new();
1696 let fired = wheel.poll(now + ms(20), &mut buf);
1697 assert_eq!(fired, 3);
1698 assert_eq!(buf.len(), 3);
1699 assert!(wheel.is_empty());
1700 }
1701
1702 #[test]
1703 fn miri_cancel_zombie_drop_type() {
1704 let now = Instant::now();
1705 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1706
1707 let h = wheel.schedule(now + ms(10), "zombie".to_string());
1708
1709 let mut buf = Vec::new();
1710 wheel.poll(now + ms(20), &mut buf);
1711 assert_eq!(buf, vec!["zombie".to_string()]);
1712
1713 let val = wheel.cancel(h);
1715 assert_eq!(val, None);
1716 }
1717
1718 #[test]
1719 fn miri_free_active_and_zombie() {
1720 let now = Instant::now();
1721 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1722
1723 let h1 = wheel.schedule(now + ms(10), "active".to_string());
1725 wheel.free(h1);
1726
1727 let mut buf = Vec::new();
1729 wheel.poll(now + ms(20), &mut buf);
1730 assert_eq!(buf, vec!["active".to_string()]);
1731
1732 let h2 = wheel.schedule(now + ms(10), "will-fire".to_string());
1734 buf.clear();
1735 wheel.poll(now + ms(20), &mut buf);
1736 wheel.free(h2); }
1738
1739 #[test]
1740 fn miri_reschedule_drop_type() {
1741 let now = Instant::now();
1742 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1743
1744 let h = wheel.schedule(now + ms(100), "moveme".to_string());
1745 let h = wheel.reschedule(h, now + ms(50));
1746
1747 let mut buf = Vec::new();
1748 wheel.poll(now + ms(55), &mut buf);
1749 assert_eq!(buf, vec!["moveme".to_string()]);
1750
1751 wheel.cancel(h);
1752 }
1753
1754 #[test]
1755 fn miri_dll_multi_entry_same_slot() {
1756 let now = Instant::now();
1758 let mut wheel: Wheel<Vec<u8>> = Wheel::unbounded(64, now);
1759
1760 let mut handles = Vec::new();
1761 for i in 0..5 {
1762 handles.push(wheel.schedule(now + ms(10), vec![i; 32]));
1763 }
1764
1765 let v2 = wheel.cancel(handles.remove(2));
1767 assert_eq!(v2.unwrap(), vec![2; 32]);
1768
1769 let v0 = wheel.cancel(handles.remove(0));
1770 assert_eq!(v0.unwrap(), vec![0; 32]);
1771
1772 let mut buf = Vec::new();
1774 wheel.poll(now + ms(20), &mut buf);
1775 assert_eq!(buf.len(), 3);
1776
1777 for h in handles {
1779 wheel.cancel(h);
1780 }
1781 }
1782
1783 #[test]
1784 fn miri_drop_wheel_with_entries() {
1785 let now = Instant::now();
1786 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1787
1788 for i in 0..20 {
1790 wheel.schedule_forget(now + ms(i * 100), format!("entry-{i}"));
1791 }
1792 assert_eq!(wheel.len(), 20);
1793
1794 drop(wheel);
1796 }
1797
1798 #[test]
1799 fn miri_bounded_lifecycle() {
1800 let now = Instant::now();
1801 let mut wheel: BoundedWheel<String> = BoundedWheel::bounded(4, now);
1802
1803 let h1 = wheel.try_schedule(now + ms(10), "a".to_string()).unwrap();
1804 let h2 = wheel.try_schedule(now + ms(20), "b".to_string()).unwrap();
1805 let h3 = wheel.try_schedule(now + ms(30), "c".to_string()).unwrap();
1806 let h4 = wheel.try_schedule(now + ms(40), "d".to_string()).unwrap();
1807
1808 let err = wheel.try_schedule(now + ms(50), "e".to_string());
1810 assert!(err.is_err());
1811
1812 wheel.cancel(h1);
1814 let h5 = wheel.try_schedule(now + ms(50), "e".to_string()).unwrap();
1815
1816 let mut buf = Vec::new();
1818 wheel.poll(now + ms(25), &mut buf);
1819
1820 wheel.cancel(h2);
1822 wheel.free(h3);
1823 wheel.free(h4);
1824 wheel.free(h5);
1825 }
1826}
1827
1828#[cfg(test)]
1829mod proptests {
1830 use super::*;
1831 use proptest::prelude::*;
1832 use std::collections::HashSet;
1833 use std::mem;
1834 use std::time::{Duration, Instant};
1835
1836 #[derive(Debug, Clone)]
1838 enum Op {
1839 Schedule { deadline_ms: u64 },
1841 Cancel { idx: usize },
1843 }
1844
1845 fn op_strategy() -> impl Strategy<Value = Op> {
1846 prop_oneof![
1847 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1849 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1851 ]
1852 }
1853
1854 proptest! {
1855 #![proptest_config(ProptestConfig::with_cases(500))]
1856
1857 #[test]
1864 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1865 let now = Instant::now();
1866 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1867
1868 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1869 let mut active_values: HashSet<u64> = HashSet::new();
1870 let mut next_id: u64 = 0;
1871
1872 for op in &ops {
1873 match op {
1874 Op::Schedule { deadline_ms } => {
1875 let id = next_id;
1876 next_id += 1;
1877 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1878 handles.push(h);
1879 active_values.insert(id);
1880 }
1881 Op::Cancel { idx } => {
1882 if !handles.is_empty() {
1883 let i = idx % handles.len();
1884 let h = handles.swap_remove(i);
1885 let val = wheel.cancel(h);
1886 let v = val.unwrap();
1888 assert!(active_values.remove(&v));
1889 }
1890 }
1891 }
1892 prop_assert_eq!(wheel.len(), active_values.len());
1894 }
1895
1896 let mut buf = Vec::new();
1898 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1900
1901 for h in handles {
1903 mem::forget(h);
1904 }
1905
1906 let fired_set: HashSet<u64> = buf.into_iter().collect();
1907 prop_assert_eq!(fired_set, active_values);
1908 prop_assert!(wheel.is_empty());
1909 }
1910
1911 #[test]
1917 fn fuzz_poll_timing(
1918 deadlines in proptest::collection::vec(1u64..5000, 1..100),
1919 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1920 ) {
1921 let now = Instant::now();
1922 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1923
1924 for (i, &d) in deadlines.iter().enumerate() {
1926 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1927 }
1928
1929 let mut sorted_times: Vec<u64> = poll_times;
1931 sorted_times.sort_unstable();
1932 sorted_times.dedup();
1933
1934 let mut all_fired: Vec<u64> = Vec::new();
1935
1936 for &t in &sorted_times {
1937 let mut buf = Vec::new();
1938 wheel.poll(now + Duration::from_millis(t), &mut buf);
1939
1940 for &id in &buf {
1942 let deadline_ms = deadlines[id as usize];
1943 prop_assert!(deadline_ms <= t,
1944 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1945 }
1946
1947 all_fired.extend(buf);
1948 }
1949
1950 let mut final_buf = Vec::new();
1952 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1953 all_fired.extend(final_buf);
1954
1955 all_fired.sort_unstable();
1957 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1958 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1959 prop_assert!(wheel.is_empty());
1960 }
1961 }
1962}