1use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48 tick_duration: Duration,
49 slots_per_level: usize,
50 clk_shift: u32,
51 num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55 fn default() -> Self {
56 WheelBuilder {
57 tick_duration: Duration::from_millis(1),
58 slots_per_level: 64,
59 clk_shift: 3,
60 num_levels: 7,
61 }
62 }
63}
64
65impl WheelBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn tick_duration(mut self, d: Duration) -> Self {
73 self.tick_duration = d;
74 self
75 }
76
77 pub fn slots_per_level(mut self, n: usize) -> Self {
79 self.slots_per_level = n;
80 self
81 }
82
83 pub fn clk_shift(mut self, s: u32) -> Self {
85 self.clk_shift = s;
86 self
87 }
88
89 pub fn num_levels(mut self, n: usize) -> Self {
91 self.num_levels = n;
92 self
93 }
94
95 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100 UnboundedWheelBuilder {
101 config: self,
102 chunk_capacity,
103 }
104 }
105
106 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110 BoundedWheelBuilder {
111 config: self,
112 capacity,
113 }
114 }
115
116 fn validate(&self) {
117 assert!(
118 self.slots_per_level.is_power_of_two(),
119 "slots_per_level must be a power of 2, got {}",
120 self.slots_per_level
121 );
122 assert!(
123 self.slots_per_level <= 64,
124 "slots_per_level must be <= 64 (u64 bitmask), got {}",
125 self.slots_per_level
126 );
127 assert!(self.num_levels > 0, "num_levels must be > 0");
128 assert!(
129 self.num_levels <= 8,
130 "num_levels must be <= 8 (u8 bitmask), got {}",
131 self.num_levels
132 );
133 assert!(self.clk_shift > 0, "clk_shift must be > 0");
134 assert!(
135 !self.tick_duration.is_zero(),
136 "tick_duration must be non-zero"
137 );
138 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139 assert!(
140 max_shift < 64,
141 "(num_levels - 1) * clk_shift must be < 64, got {}",
142 max_shift
143 );
144 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145 assert!(
146 slots_log2 + max_shift < 64,
147 "slots_per_level << max_shift would overflow u64"
148 );
149 }
150
151 fn tick_ns(&self) -> u64 {
152 self.tick_duration.as_nanos() as u64
153 }
154}
155
156#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161 config: WheelBuilder,
162 chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173 self.config.validate();
174 let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175 let levels = build_levels::<T>(&self.config);
176 TimerWheel {
177 slab,
178 num_levels: self.config.num_levels,
179 levels,
180 current_ticks: 0,
181 tick_ns: self.config.tick_ns(),
182 epoch: now,
183 active_levels: 0,
184 len: 0,
185 _marker: PhantomData,
186 }
187 }
188}
189
190#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195 config: WheelBuilder,
196 capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207 self.config.validate();
208 let slab = bounded::Slab::with_capacity(self.capacity);
209 let levels = build_levels::<T>(&self.config);
210 TimerWheel {
211 slab,
212 num_levels: self.config.num_levels,
213 levels,
214 current_ticks: 0,
215 tick_ns: self.config.tick_ns(),
216 epoch: now,
217 active_levels: 0,
218 len: 0,
219 _marker: PhantomData,
220 }
221 }
222}
223
224pub struct TimerWheel<
241 T: 'static,
242 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244 slab: S,
245 levels: Vec<Level<T>>,
246 num_levels: usize,
247 active_levels: u8,
248 current_ticks: u64,
249 tick_ns: u64,
250 epoch: Instant,
251 len: usize,
252 _marker: PhantomData<*const ()>, }
254
255#[allow(clippy::non_send_fields_in_send_ty)]
283unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
284
285pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
287
288pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
290
291impl<T: 'static> Wheel<T> {
296 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
300 WheelBuilder::default().unbounded(chunk_capacity).build(now)
301 }
302}
303
304impl<T: 'static> BoundedWheel<T> {
305 pub fn bounded(capacity: usize, now: Instant) -> Self {
309 WheelBuilder::default().bounded(capacity).build(now)
310 }
311}
312
313fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
314 (0..config.num_levels)
315 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
316 .collect()
317}
318
319impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
324 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
334 let deadline_ticks = self.instant_to_ticks(deadline);
335 let entry = WheelEntry::new(deadline_ticks, value, 2);
336 let slot = self.slab.alloc(entry);
337 let ptr = slot.into_ptr();
338 self.insert_entry(ptr, deadline_ticks);
339 self.len += 1;
340 TimerHandle::new(ptr)
341 }
342
343 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
353 let deadline_ticks = self.instant_to_ticks(deadline);
354 let entry = WheelEntry::new(deadline_ticks, value, 1);
355 let slot = self.slab.alloc(entry);
356 let ptr = slot.into_ptr();
357 self.insert_entry(ptr, deadline_ticks);
358 self.len += 1;
359 }
360}
361
362impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
367 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
373 let deadline_ticks = self.instant_to_ticks(deadline);
374 let entry = WheelEntry::new(deadline_ticks, value, 2);
375 match self.slab.try_alloc(entry) {
376 Ok(slot) => {
377 let ptr = slot.into_ptr();
378 self.insert_entry(ptr, deadline_ticks);
379 self.len += 1;
380 Ok(TimerHandle::new(ptr))
381 }
382 Err(full) => {
383 let wheel_entry = full.into_inner();
386 let value = unsafe { wheel_entry.take_value() }
387 .expect("entry was just constructed with Some(value)");
388 Err(Full(value))
389 }
390 }
391 }
392
393 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
399 let deadline_ticks = self.instant_to_ticks(deadline);
400 let entry = WheelEntry::new(deadline_ticks, value, 1);
401 match self.slab.try_alloc(entry) {
402 Ok(slot) => {
403 let ptr = slot.into_ptr();
404 self.insert_entry(ptr, deadline_ticks);
405 self.len += 1;
406 Ok(())
407 }
408 Err(full) => {
409 let wheel_entry = full.into_inner();
410 let value = unsafe { wheel_entry.take_value() }
411 .expect("entry was just constructed with Some(value)");
412 Err(Full(value))
413 }
414 }
415 }
416}
417
418impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
423 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
432 let ptr = handle.ptr;
433 mem::forget(handle);
435
436 let entry = unsafe { entry_ref(ptr) };
438 let refs = entry.refs();
439
440 if refs == 2 {
441 let value = unsafe { entry.take_value() };
443 self.remove_entry(ptr);
444 self.len -= 1;
445 unsafe { self.slab.free_ptr(ptr) };
447 value
448 } else {
449 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
452 unsafe { self.slab.free_ptr(ptr) };
454 None
455 }
456 }
457
458 pub fn free(&mut self, handle: TimerHandle<T>) {
466 let ptr = handle.ptr;
467 mem::forget(handle);
468
469 let entry = unsafe { entry_ref(ptr) };
471 let new_refs = entry.dec_refs();
472
473 if new_refs == 0 {
474 unsafe { self.slab.free_ptr(ptr) };
477 }
478 }
480
481 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
493 let ptr = handle.ptr;
494 mem::forget(handle);
495
496 let entry = unsafe { entry_ref(ptr) };
498 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
499
500 self.remove_entry(ptr);
502
503 let new_ticks = self.instant_to_ticks(new_deadline);
505 entry.set_deadline_ticks(new_ticks);
506 self.insert_entry(ptr, new_ticks);
507
508 TimerHandle::new(ptr)
509 }
510
511 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
515 self.poll_with_limit(now, usize::MAX, buf)
516 }
517
518 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
525 let now_ticks = self.instant_to_ticks(now);
526 self.current_ticks = now_ticks;
527
528 let mut fired = 0;
529 let mut mask = self.active_levels;
530
531 while mask != 0 && fired < limit {
532 let lvl_idx = mask.trailing_zeros() as usize;
533 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
535 }
536 fired
537 }
538
539 pub fn next_deadline(&self) -> Option<Instant> {
544 let mut min_ticks: Option<u64> = None;
545
546 let mut lvl_mask = self.active_levels;
547 while lvl_mask != 0 {
548 let lvl_idx = lvl_mask.trailing_zeros() as usize;
549 lvl_mask &= lvl_mask - 1;
550
551 let level = &self.levels[lvl_idx];
552 let mut slot_mask = level.active_slots();
553 while slot_mask != 0 {
554 let slot_idx = slot_mask.trailing_zeros() as usize;
555 slot_mask &= slot_mask - 1;
556
557 let slot = level.slot(slot_idx);
558 let mut entry_ptr = slot.entry_head();
559
560 while !entry_ptr.is_null() {
561 let entry = unsafe { entry_ref(entry_ptr) };
563 let dt = entry.deadline_ticks();
564 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
565 entry_ptr = entry.next();
566 }
567 }
568 }
569
570 min_ticks.map(|t| self.ticks_to_instant(t))
571 }
572
573 #[inline]
575 pub fn len(&self) -> usize {
576 self.len
577 }
578
579 #[inline]
581 pub fn is_empty(&self) -> bool {
582 self.len == 0
583 }
584
585 #[inline]
590 fn instant_to_ticks(&self, instant: Instant) -> u64 {
591 let dur = instant.saturating_duration_since(self.epoch);
593 dur.as_nanos() as u64 / self.tick_ns
594 }
595
596 #[inline]
597 fn ticks_to_instant(&self, ticks: u64) -> Instant {
598 self.epoch + Duration::from_nanos(ticks * self.tick_ns)
599 }
600
601 #[inline]
611 fn select_level(&self, deadline_ticks: u64) -> usize {
612 let delta = deadline_ticks.saturating_sub(self.current_ticks);
613
614 for (i, level) in self.levels.iter().enumerate() {
615 if delta < level.range() {
616 return i;
617 }
618 }
619
620 self.num_levels - 1
622 }
623
624 #[inline]
633 #[allow(clippy::needless_pass_by_ref_mut)]
634 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
635 let lvl_idx = self.select_level(deadline_ticks);
636 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
637
638 let entry = unsafe { entry_ref(entry_ptr) };
641 entry.set_location(lvl_idx as u8, slot_idx as u16);
642
643 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
645
646 self.levels[lvl_idx].activate_slot(slot_idx);
648 self.active_levels |= 1 << lvl_idx;
649 }
650
651 #[inline]
657 #[allow(clippy::needless_pass_by_ref_mut)]
658 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
659 let entry = unsafe { entry_ref(entry_ptr) };
661
662 let lvl_idx = entry.level() as usize;
663 let slot_idx = entry.slot_idx() as usize;
664
665 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
667
668 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
669 self.levels[lvl_idx].deactivate_slot(slot_idx);
670 if !self.levels[lvl_idx].is_active() {
671 self.active_levels &= !(1 << lvl_idx);
672 }
673 }
674 }
675
676 #[inline]
684 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
685 let entry = unsafe { entry_ref(entry_ptr) };
687
688 let value = unsafe { entry.take_value() };
691
692 let new_refs = entry.dec_refs();
693 if new_refs == 0 {
694 unsafe { self.slab.free_ptr(entry_ptr) };
697 }
698 self.len -= 1;
702 value
703 }
704
705 fn poll_level(
712 &mut self,
713 lvl_idx: usize,
714 now_ticks: u64,
715 limit: usize,
716 buf: &mut Vec<T>,
717 ) -> usize {
718 let mut fired = 0;
719 let mut mask = self.levels[lvl_idx].active_slots();
720
721 while mask != 0 && fired < limit {
722 let slot_idx = mask.trailing_zeros() as usize;
723 mask &= mask - 1;
724
725 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
726 let slot = unsafe { &*slot_ptr };
730 let mut entry_ptr = slot.entry_head();
731
732 while !entry_ptr.is_null() && fired < limit {
733 let entry = unsafe { entry_ref(entry_ptr) };
735 let next_entry = entry.next();
736
737 if entry.deadline_ticks() <= now_ticks {
738 unsafe { slot.remove_entry(entry_ptr) };
739
740 if let Some(value) = self.fire_entry(entry_ptr) {
741 buf.push(value);
742 }
743 fired += 1;
744 }
745
746 entry_ptr = next_entry;
747 }
748
749 if slot.is_empty() {
750 self.levels[lvl_idx].deactivate_slot(slot_idx);
751 }
752 }
753
754 if !self.levels[lvl_idx].is_active() {
756 self.active_levels &= !(1 << lvl_idx);
757 }
758
759 fired
760 }
761}
762
763impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
768 fn drop(&mut self) {
769 let mut lvl_mask = self.active_levels;
771 while lvl_mask != 0 {
772 let lvl_idx = lvl_mask.trailing_zeros() as usize;
773 lvl_mask &= lvl_mask - 1;
774
775 let level = &self.levels[lvl_idx];
776 let mut slot_mask = level.active_slots();
777 while slot_mask != 0 {
778 let slot_idx = slot_mask.trailing_zeros() as usize;
779 slot_mask &= slot_mask - 1;
780
781 let slot = level.slot(slot_idx);
782 let mut entry_ptr = slot.entry_head();
783 while !entry_ptr.is_null() {
784 let entry = unsafe { entry_ref(entry_ptr) };
786 let next_entry = entry.next();
787
788 unsafe { self.slab.free(nexus_slab::RawSlot::from_ptr(entry_ptr)) };
790
791 entry_ptr = next_entry;
792 }
793 }
794 }
795 }
796}
797
798#[cfg(test)]
803mod tests {
804 use super::*;
805 use std::time::{Duration, Instant};
806
807 fn ms(millis: u64) -> Duration {
808 Duration::from_millis(millis)
809 }
810
811 fn _assert_send<T: Send>() {}
816
817 #[test]
818 fn wheel_is_send() {
819 _assert_send::<Wheel<u64>>();
820 _assert_send::<BoundedWheel<u64>>();
821 }
822
823 #[test]
828 fn default_config() {
829 let now = Instant::now();
830 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
831 assert!(wheel.is_empty());
832 assert_eq!(wheel.len(), 0);
833 }
834
835 #[test]
836 fn bounded_construction() {
837 let now = Instant::now();
838 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
839 assert!(wheel.is_empty());
840 }
841
842 #[test]
843 #[should_panic(expected = "slots_per_level must be a power of 2")]
844 fn invalid_config_non_power_of_two() {
845 let now = Instant::now();
846 WheelBuilder::default()
847 .slots_per_level(65)
848 .unbounded(1024)
849 .build::<u64>(now);
850 }
851
852 #[test]
857 fn schedule_and_cancel() {
858 let now = Instant::now();
859 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
860
861 let h = wheel.schedule(now + ms(50), 42);
862 assert_eq!(wheel.len(), 1);
863
864 let val = wheel.cancel(h);
865 assert_eq!(val, Some(42));
866 assert_eq!(wheel.len(), 0);
867 }
868
869 #[test]
870 fn schedule_forget_fires() {
871 let now = Instant::now();
872 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
873
874 wheel.schedule_forget(now + ms(10), 99);
875 assert_eq!(wheel.len(), 1);
876
877 let mut buf = Vec::new();
878 let fired = wheel.poll(now + ms(20), &mut buf);
879 assert_eq!(fired, 1);
880 assert_eq!(buf, vec![99]);
881 assert_eq!(wheel.len(), 0);
882 }
883
884 #[test]
885 fn cancel_after_fire_returns_none() {
886 let now = Instant::now();
887 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
888
889 let h = wheel.schedule(now + ms(10), 42);
890
891 let mut buf = Vec::new();
892 wheel.poll(now + ms(20), &mut buf);
893 assert_eq!(buf, vec![42]);
894
895 let val = wheel.cancel(h);
897 assert_eq!(val, None);
898 }
899
900 #[test]
901 fn free_active_timer_becomes_fire_and_forget() {
902 let now = Instant::now();
903 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
904
905 let h = wheel.schedule(now + ms(10), 42);
906 wheel.free(h); assert_eq!(wheel.len(), 1);
908
909 let mut buf = Vec::new();
910 wheel.poll(now + ms(20), &mut buf);
911 assert_eq!(buf, vec![42]);
912 assert_eq!(wheel.len(), 0);
913 }
914
915 #[test]
916 fn free_zombie_handle() {
917 let now = Instant::now();
918 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
919
920 let h = wheel.schedule(now + ms(10), 42);
921
922 let mut buf = Vec::new();
923 wheel.poll(now + ms(20), &mut buf);
924
925 wheel.free(h);
927 }
928
929 #[test]
934 fn bounded_full() {
935 let now = Instant::now();
936 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
937
938 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
939 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
940
941 let err = wheel.try_schedule(now + ms(30), 3);
942 assert!(err.is_err());
943 let recovered = err.unwrap_err().into_inner();
944 assert_eq!(recovered, 3);
945
946 wheel.cancel(h1);
948 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
949
950 wheel.free(h2);
952 wheel.free(h3);
953 }
954
955 #[test]
956 fn bounded_schedule_forget_full() {
957 let now = Instant::now();
958 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
959
960 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
961 let err = wheel.try_schedule_forget(now + ms(20), 2);
962 assert!(err.is_err());
963 }
964
965 #[test]
970 fn poll_respects_deadline() {
971 let now = Instant::now();
972 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
973
974 wheel.schedule_forget(now + ms(10), 1);
975 wheel.schedule_forget(now + ms(50), 2);
976 wheel.schedule_forget(now + ms(100), 3);
977
978 let mut buf = Vec::new();
979
980 let fired = wheel.poll(now + ms(20), &mut buf);
982 assert_eq!(fired, 1);
983 assert_eq!(buf, vec![1]);
984 assert_eq!(wheel.len(), 2);
985
986 buf.clear();
988 let fired = wheel.poll(now + ms(60), &mut buf);
989 assert_eq!(fired, 1);
990 assert_eq!(buf, vec![2]);
991
992 buf.clear();
994 let fired = wheel.poll(now + ms(200), &mut buf);
995 assert_eq!(fired, 1);
996 assert_eq!(buf, vec![3]);
997
998 assert!(wheel.is_empty());
999 }
1000
1001 #[test]
1002 fn poll_with_limit() {
1003 let now = Instant::now();
1004 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1005
1006 for i in 0..10 {
1007 wheel.schedule_forget(now + ms(1), i);
1008 }
1009
1010 let mut buf = Vec::new();
1011
1012 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1014 assert_eq!(fired, 3);
1015 assert_eq!(wheel.len(), 7);
1016
1017 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1018 assert_eq!(fired, 3);
1019 assert_eq!(wheel.len(), 4);
1020
1021 let fired = wheel.poll(now + ms(5), &mut buf);
1023 assert_eq!(fired, 4);
1024 assert!(wheel.is_empty());
1025 assert_eq!(buf.len(), 10);
1026 }
1027
1028 #[test]
1033 fn timers_across_levels() {
1034 let now = Instant::now();
1035 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1036
1037 wheel.schedule_forget(now + ms(5), 0);
1039 wheel.schedule_forget(now + ms(200), 1);
1041 wheel.schedule_forget(now + ms(1000), 2);
1043
1044 let mut buf = Vec::new();
1045
1046 wheel.poll(now + ms(10), &mut buf);
1047 assert_eq!(buf, vec![0]);
1048
1049 buf.clear();
1050 wheel.poll(now + ms(250), &mut buf);
1051 assert_eq!(buf, vec![1]);
1052
1053 buf.clear();
1054 wheel.poll(now + ms(1500), &mut buf);
1055 assert_eq!(buf, vec![2]);
1056
1057 assert!(wheel.is_empty());
1058 }
1059
1060 #[test]
1065 fn next_deadline_empty() {
1066 let now = Instant::now();
1067 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1068 assert!(wheel.next_deadline().is_none());
1069 }
1070
1071 #[test]
1072 fn next_deadline_returns_earliest() {
1073 let now = Instant::now();
1074 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1075
1076 wheel.schedule_forget(now + ms(100), 1);
1077 wheel.schedule_forget(now + ms(50), 2);
1078 wheel.schedule_forget(now + ms(200), 3);
1079
1080 let next = wheel.next_deadline().unwrap();
1081 let delta = next.duration_since(now);
1083 assert!(delta >= ms(49) && delta <= ms(51));
1084 }
1085
1086 #[test]
1091 fn deadline_in_the_past_fires_immediately() {
1092 let now = Instant::now();
1093 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1094
1095 wheel.schedule_forget(now, 42);
1097
1098 let mut buf = Vec::new();
1099 let fired = wheel.poll(now + ms(1), &mut buf);
1100 assert_eq!(fired, 1);
1101 assert_eq!(buf, vec![42]);
1102 }
1103
1104 #[test]
1109 fn deadline_beyond_max_range_clamped() {
1110 let now = Instant::now();
1111 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1112
1113 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1115 assert_eq!(wheel.len(), 1);
1116
1117 let mut buf = Vec::new();
1119 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1120 assert_eq!(buf, vec![99]);
1121
1122 let val = wheel.cancel(h);
1132 assert_eq!(val, None);
1133 }
1134
1135 #[test]
1140 fn drop_cleans_up_active_entries() {
1141 let now = Instant::now();
1142 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1143
1144 for i in 0..100 {
1145 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1146 }
1147
1148 assert_eq!(wheel.len(), 100);
1149 drop(wheel);
1151 }
1152
1153 #[test]
1154 fn drop_with_outstanding_handles() {
1155 let now = Instant::now();
1156 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1157
1158 let h1 = wheel.schedule(now + ms(10), 1);
1160 let h2 = wheel.schedule(now + ms(20), 2);
1161
1162 wheel.free(h1);
1164 wheel.free(h2);
1165
1166 drop(wheel);
1168 }
1169
1170 #[test]
1175 fn level_selection_boundaries() {
1176 let now = Instant::now();
1177 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1178
1179 assert_eq!(wheel.select_level(0), 0);
1181 assert_eq!(wheel.select_level(63), 0);
1182
1183 assert_eq!(wheel.select_level(64), 1);
1185 assert_eq!(wheel.select_level(511), 1);
1186
1187 assert_eq!(wheel.select_level(512), 2);
1189 }
1190
1191 #[test]
1196 fn cancel_after_time_advance() {
1197 let now = Instant::now();
1202 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1203
1204 let h = wheel.schedule(now + ms(500), 42);
1205 assert_eq!(wheel.len(), 1);
1206
1207 let mut buf = Vec::new();
1209 let fired = wheel.poll(now + ms(400), &mut buf);
1210 assert_eq!(fired, 0);
1211 assert!(buf.is_empty());
1212
1213 let val = wheel.cancel(h);
1215 assert_eq!(val, Some(42));
1216 assert_eq!(wheel.len(), 0);
1217 }
1218
1219 #[test]
1224 fn multiple_entries_same_slot() {
1225 let now = Instant::now();
1226 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1227
1228 let mut handles = Vec::new();
1230 for i in 0..5 {
1231 handles.push(wheel.schedule(now + ms(10), i));
1232 }
1233 assert_eq!(wheel.len(), 5);
1234
1235 let v2 = wheel.cancel(handles.remove(2));
1237 assert_eq!(v2, Some(2));
1238 let v0 = wheel.cancel(handles.remove(0));
1239 assert_eq!(v0, Some(0));
1240 assert_eq!(wheel.len(), 3);
1241
1242 let mut buf = Vec::new();
1244 let fired = wheel.poll(now + ms(20), &mut buf);
1245 assert_eq!(fired, 3);
1246
1247 for h in handles {
1249 let val = wheel.cancel(h);
1250 assert_eq!(val, None); }
1252 }
1253
1254 #[test]
1259 fn entry_at_level_boundary() {
1260 let now = Instant::now();
1263 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1264
1265 let h = wheel.schedule(now + ms(64), 99);
1266 assert_eq!(wheel.len(), 1);
1267
1268 let mut buf = Vec::new();
1270 let fired = wheel.poll(now + ms(63), &mut buf);
1271 assert_eq!(fired, 0);
1272
1273 let fired = wheel.poll(now + ms(65), &mut buf);
1275 assert_eq!(fired, 1);
1276 assert_eq!(buf, vec![99]);
1277
1278 wheel.cancel(h);
1280 }
1281
1282 #[test]
1287 fn poll_with_limit_mixed_expiry() {
1288 let now = Instant::now();
1289 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1290
1291 wheel.schedule_forget(now + ms(5), 1);
1293 wheel.schedule_forget(now + ms(5), 2);
1294 wheel.schedule_forget(now + ms(5), 3);
1295 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1298
1299 let mut buf = Vec::new();
1300
1301 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1303 assert_eq!(fired, 2);
1304 assert_eq!(wheel.len(), 3);
1305
1306 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1308 assert_eq!(fired, 1);
1309 assert_eq!(wheel.len(), 2);
1310
1311 assert_eq!(buf.len(), 3);
1313 }
1314
1315 #[test]
1320 fn reuse_after_full_drain() {
1321 let now = Instant::now();
1322 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1323
1324 for i in 0..10 {
1326 wheel.schedule_forget(now + ms(1), i);
1327 }
1328 let mut buf = Vec::new();
1329 wheel.poll(now + ms(5), &mut buf);
1330 assert_eq!(buf.len(), 10);
1331 assert!(wheel.is_empty());
1332
1333 buf.clear();
1335 for i in 10..20 {
1336 wheel.schedule_forget(now + ms(100), i);
1337 }
1338 assert_eq!(wheel.len(), 10);
1339
1340 wheel.poll(now + ms(200), &mut buf);
1341 assert_eq!(buf.len(), 10);
1342 assert!(wheel.is_empty());
1343 }
1344
1345 #[test]
1350 fn all_levels_active() {
1351 let now = Instant::now();
1352 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1353
1354 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1357 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1358 for (i, &d) in distances.iter().enumerate() {
1359 handles.push(wheel.schedule(now + ms(d), i as u64));
1360 }
1361 assert_eq!(wheel.len(), 7);
1362
1363 let order = [4, 1, 6, 0, 3, 5, 2];
1365 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1368 handles.into_iter().map(Some).collect();
1369
1370 for &idx in &order {
1371 let h = opt_handles[idx].take().unwrap();
1372 let val = wheel.cancel(h);
1373 assert_eq!(val, Some(idx as u64));
1374 }
1375 assert!(wheel.is_empty());
1376 }
1377
1378 #[test]
1383 fn poll_values_match() {
1384 let now = Instant::now();
1385 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1386
1387 let expected: Vec<u64> = (100..110).collect();
1388 for &v in &expected {
1389 wheel.schedule_forget(now + ms(5), v);
1390 }
1391
1392 let mut buf = Vec::new();
1393 wheel.poll(now + ms(10), &mut buf);
1394
1395 buf.sort();
1396 assert_eq!(buf, expected);
1397 }
1398
1399 #[test]
1404 fn reschedule_moves_deadline() {
1405 let now = Instant::now();
1406 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1407
1408 let h = wheel.schedule(now + ms(100), 42);
1409 assert_eq!(wheel.len(), 1);
1410
1411 let h = wheel.reschedule(h, now + ms(50));
1413 assert_eq!(wheel.len(), 1);
1414
1415 let mut buf = Vec::new();
1417 let fired = wheel.poll(now + ms(40), &mut buf);
1418 assert_eq!(fired, 0);
1419
1420 let fired = wheel.poll(now + ms(55), &mut buf);
1422 assert_eq!(fired, 1);
1423 assert_eq!(buf, vec![42]);
1424
1425 wheel.cancel(h);
1427 }
1428
1429 #[test]
1430 fn reschedule_to_later() {
1431 let now = Instant::now();
1432 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1433
1434 let h = wheel.schedule(now + ms(50), 7);
1435
1436 let h = wheel.reschedule(h, now + ms(200));
1438
1439 let mut buf = Vec::new();
1441 let fired = wheel.poll(now + ms(60), &mut buf);
1442 assert_eq!(fired, 0);
1443
1444 let fired = wheel.poll(now + ms(210), &mut buf);
1446 assert_eq!(fired, 1);
1447 assert_eq!(buf, vec![7]);
1448
1449 wheel.cancel(h);
1450 }
1451
1452 #[test]
1453 #[should_panic(expected = "cannot reschedule a fired timer")]
1454 fn reschedule_panics_on_zombie() {
1455 let now = Instant::now();
1456 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1457
1458 let h = wheel.schedule(now + ms(10), 42);
1459
1460 let mut buf = Vec::new();
1461 wheel.poll(now + ms(20), &mut buf);
1462
1463 let _h = wheel.reschedule(h, now + ms(100));
1465 }
1466
1467 #[test]
1472 fn custom_slots_per_level() {
1473 let now = Instant::now();
1474 let mut wheel: Wheel<u64> = WheelBuilder::default()
1476 .slots_per_level(32)
1477 .unbounded(256)
1478 .build(now);
1479
1480 let h1 = wheel.schedule(now + ms(20), 1);
1483 let h2 = wheel.schedule(now + ms(40), 2);
1485
1486 let mut buf = Vec::new();
1487 wheel.poll(now + ms(25), &mut buf);
1488 assert_eq!(buf, vec![1]);
1489
1490 buf.clear();
1491 wheel.poll(now + ms(50), &mut buf);
1492 assert_eq!(buf, vec![2]);
1493
1494 wheel.cancel(h1);
1495 wheel.cancel(h2);
1496 }
1497
1498 #[test]
1499 fn custom_clk_shift() {
1500 let now = Instant::now();
1501 let mut wheel: Wheel<u64> = WheelBuilder::default()
1503 .clk_shift(2)
1504 .unbounded(256)
1505 .build(now);
1506
1507 let h1 = wheel.schedule(now + ms(50), 1); let h2 = wheel.schedule(now + ms(100), 2); let mut buf = Vec::new();
1513 wheel.poll(now + ms(55), &mut buf);
1514 assert_eq!(buf, vec![1]);
1515
1516 buf.clear();
1517 wheel.poll(now + ms(110), &mut buf);
1518 assert_eq!(buf, vec![2]);
1519
1520 wheel.cancel(h1);
1521 wheel.cancel(h2);
1522 }
1523
1524 #[test]
1525 fn custom_num_levels() {
1526 let now = Instant::now();
1527 let mut wheel: Wheel<u64> = WheelBuilder::default()
1529 .num_levels(3)
1530 .unbounded(256)
1531 .build(now);
1532
1533 let h = wheel.schedule(now + ms(3000), 42);
1536 assert_eq!(wheel.len(), 1);
1537
1538 let mut buf = Vec::new();
1539 wheel.poll(now + ms(3100), &mut buf);
1540 assert_eq!(buf, vec![42]);
1541
1542 wheel.cancel(h);
1543 }
1544
1545 #[test]
1546 fn custom_tick_duration() {
1547 let now = Instant::now();
1548 let mut wheel: Wheel<u64> = WheelBuilder::default()
1550 .tick_duration(Duration::from_micros(100))
1551 .unbounded(256)
1552 .build(now);
1553
1554 wheel.schedule_forget(now + ms(1), 1);
1556 wheel.schedule_forget(now + ms(10), 2);
1558
1559 let mut buf = Vec::new();
1560 wheel.poll(now + ms(2), &mut buf);
1561 assert_eq!(buf, vec![1]);
1562
1563 buf.clear();
1564 wheel.poll(now + ms(15), &mut buf);
1565 assert_eq!(buf, vec![2]);
1566 }
1567
1568 #[test]
1569 fn bounded_custom_config() {
1570 let now = Instant::now();
1571 let mut wheel: BoundedWheel<u64> = WheelBuilder::default()
1572 .slots_per_level(16)
1573 .num_levels(4)
1574 .bounded(8)
1575 .build(now);
1576
1577 let mut handles = Vec::new();
1579 for i in 0..8 {
1580 handles.push(wheel.try_schedule(now + ms(i * 10 + 10), i).unwrap());
1581 }
1582 assert!(wheel.try_schedule(now + ms(100), 99).is_err());
1583
1584 wheel.cancel(handles.remove(0));
1586 let h = wheel.try_schedule(now + ms(100), 99).unwrap();
1587 handles.push(h);
1588
1589 for h in handles {
1591 wheel.cancel(h);
1592 }
1593 }
1594
1595 #[test]
1600 #[should_panic(expected = "slots_per_level must be <= 64")]
1601 fn invalid_config_too_many_slots() {
1602 let now = Instant::now();
1603 WheelBuilder::default()
1604 .slots_per_level(128)
1605 .unbounded(1024)
1606 .build::<u64>(now);
1607 }
1608
1609 #[test]
1610 #[should_panic(expected = "num_levels must be > 0")]
1611 fn invalid_config_zero_levels() {
1612 let now = Instant::now();
1613 WheelBuilder::default()
1614 .num_levels(0)
1615 .unbounded(1024)
1616 .build::<u64>(now);
1617 }
1618
1619 #[test]
1620 #[should_panic(expected = "num_levels must be <= 8")]
1621 fn invalid_config_too_many_levels() {
1622 let now = Instant::now();
1623 WheelBuilder::default()
1624 .num_levels(9)
1625 .unbounded(1024)
1626 .build::<u64>(now);
1627 }
1628
1629 #[test]
1630 #[should_panic(expected = "clk_shift must be > 0")]
1631 fn invalid_config_zero_shift() {
1632 let now = Instant::now();
1633 WheelBuilder::default()
1634 .clk_shift(0)
1635 .unbounded(1024)
1636 .build::<u64>(now);
1637 }
1638
1639 #[test]
1640 #[should_panic(expected = "tick_duration must be non-zero")]
1641 fn invalid_config_zero_tick() {
1642 let now = Instant::now();
1643 WheelBuilder::default()
1644 .tick_duration(Duration::ZERO)
1645 .unbounded(1024)
1646 .build::<u64>(now);
1647 }
1648
1649 #[test]
1650 #[should_panic(expected = "overflow")]
1651 fn invalid_config_shift_overflow() {
1652 let now = Instant::now();
1653 WheelBuilder::default()
1657 .num_levels(8)
1658 .clk_shift(9)
1659 .unbounded(1024)
1660 .build::<u64>(now);
1661 }
1662
1663 #[test]
1670 fn miri_schedule_cancel_drop_type() {
1671 let now = Instant::now();
1672 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1673
1674 let h = wheel.schedule(now + ms(50), "hello".to_string());
1675 let val = wheel.cancel(h);
1676 assert_eq!(val, Some("hello".to_string()));
1677 assert!(wheel.is_empty());
1678 }
1679
1680 #[test]
1681 fn miri_poll_fires_drop_type() {
1682 let now = Instant::now();
1683 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1684
1685 wheel.schedule_forget(now + ms(10), "a".to_string());
1686 wheel.schedule_forget(now + ms(10), "b".to_string());
1687 wheel.schedule_forget(now + ms(10), "c".to_string());
1688
1689 let mut buf = Vec::new();
1690 let fired = wheel.poll(now + ms(20), &mut buf);
1691 assert_eq!(fired, 3);
1692 assert_eq!(buf.len(), 3);
1693 assert!(wheel.is_empty());
1694 }
1695
1696 #[test]
1697 fn miri_cancel_zombie_drop_type() {
1698 let now = Instant::now();
1699 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1700
1701 let h = wheel.schedule(now + ms(10), "zombie".to_string());
1702
1703 let mut buf = Vec::new();
1704 wheel.poll(now + ms(20), &mut buf);
1705 assert_eq!(buf, vec!["zombie".to_string()]);
1706
1707 let val = wheel.cancel(h);
1709 assert_eq!(val, None);
1710 }
1711
1712 #[test]
1713 fn miri_free_active_and_zombie() {
1714 let now = Instant::now();
1715 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1716
1717 let h1 = wheel.schedule(now + ms(10), "active".to_string());
1719 wheel.free(h1);
1720
1721 let mut buf = Vec::new();
1723 wheel.poll(now + ms(20), &mut buf);
1724 assert_eq!(buf, vec!["active".to_string()]);
1725
1726 let h2 = wheel.schedule(now + ms(10), "will-fire".to_string());
1728 buf.clear();
1729 wheel.poll(now + ms(20), &mut buf);
1730 wheel.free(h2); }
1732
1733 #[test]
1734 fn miri_reschedule_drop_type() {
1735 let now = Instant::now();
1736 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1737
1738 let h = wheel.schedule(now + ms(100), "moveme".to_string());
1739 let h = wheel.reschedule(h, now + ms(50));
1740
1741 let mut buf = Vec::new();
1742 wheel.poll(now + ms(55), &mut buf);
1743 assert_eq!(buf, vec!["moveme".to_string()]);
1744
1745 wheel.cancel(h);
1746 }
1747
1748 #[test]
1749 fn miri_dll_multi_entry_same_slot() {
1750 let now = Instant::now();
1752 let mut wheel: Wheel<Vec<u8>> = Wheel::unbounded(64, now);
1753
1754 let mut handles = Vec::new();
1755 for i in 0..5 {
1756 handles.push(wheel.schedule(now + ms(10), vec![i; 32]));
1757 }
1758
1759 let v2 = wheel.cancel(handles.remove(2));
1761 assert_eq!(v2.unwrap(), vec![2; 32]);
1762
1763 let v0 = wheel.cancel(handles.remove(0));
1764 assert_eq!(v0.unwrap(), vec![0; 32]);
1765
1766 let mut buf = Vec::new();
1768 wheel.poll(now + ms(20), &mut buf);
1769 assert_eq!(buf.len(), 3);
1770
1771 for h in handles {
1773 wheel.cancel(h);
1774 }
1775 }
1776
1777 #[test]
1778 fn miri_drop_wheel_with_entries() {
1779 let now = Instant::now();
1780 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1781
1782 for i in 0..20 {
1784 wheel.schedule_forget(now + ms(i * 100), format!("entry-{i}"));
1785 }
1786 assert_eq!(wheel.len(), 20);
1787
1788 drop(wheel);
1790 }
1791
1792 #[test]
1793 fn miri_bounded_lifecycle() {
1794 let now = Instant::now();
1795 let mut wheel: BoundedWheel<String> = BoundedWheel::bounded(4, now);
1796
1797 let h1 = wheel.try_schedule(now + ms(10), "a".to_string()).unwrap();
1798 let h2 = wheel.try_schedule(now + ms(20), "b".to_string()).unwrap();
1799 let h3 = wheel.try_schedule(now + ms(30), "c".to_string()).unwrap();
1800 let h4 = wheel.try_schedule(now + ms(40), "d".to_string()).unwrap();
1801
1802 let err = wheel.try_schedule(now + ms(50), "e".to_string());
1804 assert!(err.is_err());
1805
1806 wheel.cancel(h1);
1808 let h5 = wheel.try_schedule(now + ms(50), "e".to_string()).unwrap();
1809
1810 let mut buf = Vec::new();
1812 wheel.poll(now + ms(25), &mut buf);
1813
1814 wheel.cancel(h2);
1816 wheel.free(h3);
1817 wheel.free(h4);
1818 wheel.free(h5);
1819 }
1820}
1821
1822#[cfg(test)]
1823mod proptests {
1824 use super::*;
1825 use proptest::prelude::*;
1826 use std::collections::HashSet;
1827 use std::mem;
1828 use std::time::{Duration, Instant};
1829
1830 #[derive(Debug, Clone)]
1832 enum Op {
1833 Schedule { deadline_ms: u64 },
1835 Cancel { idx: usize },
1837 }
1838
1839 fn op_strategy() -> impl Strategy<Value = Op> {
1840 prop_oneof![
1841 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1843 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1845 ]
1846 }
1847
1848 proptest! {
1849 #![proptest_config(ProptestConfig::with_cases(500))]
1850
1851 #[test]
1858 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1859 let now = Instant::now();
1860 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1861
1862 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1863 let mut active_values: HashSet<u64> = HashSet::new();
1864 let mut next_id: u64 = 0;
1865
1866 for op in &ops {
1867 match op {
1868 Op::Schedule { deadline_ms } => {
1869 let id = next_id;
1870 next_id += 1;
1871 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1872 handles.push(h);
1873 active_values.insert(id);
1874 }
1875 Op::Cancel { idx } => {
1876 if !handles.is_empty() {
1877 let i = idx % handles.len();
1878 let h = handles.swap_remove(i);
1879 let val = wheel.cancel(h);
1880 let v = val.unwrap();
1882 assert!(active_values.remove(&v));
1883 }
1884 }
1885 }
1886 prop_assert_eq!(wheel.len(), active_values.len());
1888 }
1889
1890 let mut buf = Vec::new();
1892 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1894
1895 for h in handles {
1897 mem::forget(h);
1898 }
1899
1900 let fired_set: HashSet<u64> = buf.into_iter().collect();
1901 prop_assert_eq!(fired_set, active_values);
1902 prop_assert!(wheel.is_empty());
1903 }
1904
1905 #[test]
1911 fn fuzz_poll_timing(
1912 deadlines in proptest::collection::vec(1u64..5000, 1..100),
1913 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1914 ) {
1915 let now = Instant::now();
1916 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1917
1918 for (i, &d) in deadlines.iter().enumerate() {
1920 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1921 }
1922
1923 let mut sorted_times: Vec<u64> = poll_times;
1925 sorted_times.sort();
1926 sorted_times.dedup();
1927
1928 let mut all_fired: Vec<u64> = Vec::new();
1929
1930 for &t in &sorted_times {
1931 let mut buf = Vec::new();
1932 wheel.poll(now + Duration::from_millis(t), &mut buf);
1933
1934 for &id in &buf {
1936 let deadline_ms = deadlines[id as usize];
1937 prop_assert!(deadline_ms <= t,
1938 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1939 }
1940
1941 all_fired.extend(buf);
1942 }
1943
1944 let mut final_buf = Vec::new();
1946 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1947 all_fired.extend(final_buf);
1948
1949 all_fired.sort();
1951 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1952 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1953 prop_assert!(wheel.is_empty());
1954 }
1955 }
1956}