1use std::cell::Cell;
9use std::marker::PhantomData;
10use std::mem;
11use std::time::{Duration, Instant};
12
13use nexus_slab::{Full, Slot, bounded, unbounded};
14
15use crate::entry::{EntryPtr, WheelEntry, entry_ref};
16use crate::handle::TimerHandle;
17use crate::level::Level;
18use crate::store::{BoundedStore, SlabStore};
19
20#[derive(Debug, Clone, Copy)]
48pub struct WheelBuilder {
49 tick_duration: Duration,
50 slots_per_level: usize,
51 clk_shift: u32,
52 num_levels: usize,
53}
54
55impl Default for WheelBuilder {
56 fn default() -> Self {
57 WheelBuilder {
58 tick_duration: Duration::from_millis(1),
59 slots_per_level: 64,
60 clk_shift: 3,
61 num_levels: 7,
62 }
63 }
64}
65
66impl WheelBuilder {
67 pub fn new() -> Self {
69 Self::default()
70 }
71
72 pub fn tick_duration(mut self, d: Duration) -> Self {
74 self.tick_duration = d;
75 self
76 }
77
78 pub fn slots_per_level(mut self, n: usize) -> Self {
80 self.slots_per_level = n;
81 self
82 }
83
84 pub fn clk_shift(mut self, s: u32) -> Self {
86 self.clk_shift = s;
87 self
88 }
89
90 pub fn num_levels(mut self, n: usize) -> Self {
92 self.num_levels = n;
93 self
94 }
95
96 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
101 UnboundedWheelBuilder {
102 config: self,
103 chunk_capacity,
104 }
105 }
106
107 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
111 BoundedWheelBuilder {
112 config: self,
113 capacity,
114 }
115 }
116
117 fn validate(&self) {
118 assert!(
119 self.slots_per_level.is_power_of_two(),
120 "slots_per_level must be a power of 2, got {}",
121 self.slots_per_level
122 );
123 assert!(
124 self.slots_per_level <= 64,
125 "slots_per_level must be <= 64 (u64 bitmask), got {}",
126 self.slots_per_level
127 );
128 assert!(self.num_levels > 0, "num_levels must be > 0");
129 assert!(
130 self.num_levels <= 8,
131 "num_levels must be <= 8 (u8 bitmask), got {}",
132 self.num_levels
133 );
134 assert!(self.clk_shift > 0, "clk_shift must be > 0");
135 assert!(
136 !self.tick_duration.is_zero(),
137 "tick_duration must be non-zero"
138 );
139 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
140 assert!(
141 max_shift < 64,
142 "(num_levels - 1) * clk_shift must be < 64, got {}",
143 max_shift
144 );
145 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
146 assert!(
147 slots_log2 + max_shift < 64,
148 "slots_per_level << max_shift would overflow u64"
149 );
150 }
151
152 fn tick_ns(&self) -> u64 {
153 self.tick_duration.as_nanos() as u64
154 }
155}
156
157#[derive(Debug)]
161pub struct UnboundedWheelBuilder {
162 config: WheelBuilder,
163 chunk_capacity: usize,
164}
165
166impl UnboundedWheelBuilder {
167 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
174 self.config.validate();
175 let slab = unsafe { unbounded::Slab::with_chunk_capacity(self.chunk_capacity) };
179 let levels = build_levels::<T>(&self.config);
180 let tick_ns = self.config.tick_ns();
181 TimerWheel {
182 slab,
183 num_levels: self.config.num_levels,
184 levels,
185 current_ticks: 0,
186 tick_ns,
187 inv_tick_ns: (1u128 << 64) / tick_ns as u128,
188 epoch: now,
189 active_levels: 0,
190 len: 0,
191 min_deadline: Cell::new(None),
192 _marker: PhantomData,
193 }
194 }
195}
196
197#[derive(Debug)]
201pub struct BoundedWheelBuilder {
202 config: WheelBuilder,
203 capacity: usize,
204}
205
206impl BoundedWheelBuilder {
207 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
214 self.config.validate();
215 let slab = unsafe { bounded::Slab::with_capacity(self.capacity) };
219 let levels = build_levels::<T>(&self.config);
220 let tick_ns = self.config.tick_ns();
221 TimerWheel {
222 slab,
223 num_levels: self.config.num_levels,
224 levels,
225 current_ticks: 0,
226 tick_ns,
227 inv_tick_ns: (1u128 << 64) / tick_ns as u128,
228 epoch: now,
229 active_levels: 0,
230 len: 0,
231 min_deadline: Cell::new(None),
232 _marker: PhantomData,
233 }
234 }
235}
236
237pub struct TimerWheel<
254 T: 'static,
255 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
256> {
257 slab: S,
258 levels: Vec<Level<T>>,
259 num_levels: usize,
260 active_levels: u8,
261 current_ticks: u64,
262 tick_ns: u64,
263 inv_tick_ns: u128,
264 epoch: Instant,
265 len: usize,
266 min_deadline: Cell<Option<u64>>,
267 _marker: PhantomData<*const ()>, }
269
270#[allow(clippy::non_send_fields_in_send_ty)]
298unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
299
300pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
302
303pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
305
306impl<T: 'static> Wheel<T> {
311 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
315 WheelBuilder::default().unbounded(chunk_capacity).build(now)
316 }
317}
318
319impl<T: 'static> BoundedWheel<T> {
320 pub fn bounded(capacity: usize, now: Instant) -> Self {
324 WheelBuilder::default().bounded(capacity).build(now)
325 }
326}
327
328fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
329 (0..config.num_levels)
330 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
331 .collect()
332}
333
334impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
339 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
349 let deadline_ticks = self.instant_to_ticks(deadline);
350 let entry = WheelEntry::new(deadline_ticks, value, 2);
351 let slot = self.slab.alloc(entry);
352 let ptr = slot.into_raw();
353 self.insert_entry(ptr, deadline_ticks);
354 self.len += 1;
355 TimerHandle::new(ptr)
356 }
357
358 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
368 let deadline_ticks = self.instant_to_ticks(deadline);
369 let entry = WheelEntry::new(deadline_ticks, value, 1);
370 let slot = self.slab.alloc(entry);
371 let ptr = slot.into_raw();
372 self.insert_entry(ptr, deadline_ticks);
373 self.len += 1;
374 }
375}
376
377impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
382 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
388 let deadline_ticks = self.instant_to_ticks(deadline);
389 let entry = WheelEntry::new(deadline_ticks, value, 2);
390 match self.slab.try_alloc(entry) {
391 Ok(slot) => {
392 let ptr = slot.into_raw();
393 self.insert_entry(ptr, deadline_ticks);
394 self.len += 1;
395 Ok(TimerHandle::new(ptr))
396 }
397 Err(full) => {
398 let wheel_entry = full.into_inner();
401 let value = unsafe { wheel_entry.take_value() }
402 .expect("entry was just constructed with Some(value)");
403 Err(Full(value))
404 }
405 }
406 }
407
408 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
414 let deadline_ticks = self.instant_to_ticks(deadline);
415 let entry = WheelEntry::new(deadline_ticks, value, 1);
416 match self.slab.try_alloc(entry) {
417 Ok(slot) => {
418 let ptr = slot.into_raw();
419 self.insert_entry(ptr, deadline_ticks);
420 self.len += 1;
421 Ok(())
422 }
423 Err(full) => {
424 let wheel_entry = full.into_inner();
425 let value = unsafe { wheel_entry.take_value() }
426 .expect("entry was just constructed with Some(value)");
427 Err(Full(value))
428 }
429 }
430 }
431}
432
433impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
438 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
447 let ptr = handle.ptr;
448 mem::forget(handle);
450
451 let entry = unsafe { entry_ref(ptr) };
453 let refs = entry.refs();
454
455 if refs == 2 {
456 let value = unsafe { entry.take_value() };
458 let cancelled_deadline = entry.deadline_ticks();
459 self.remove_entry(ptr);
460 self.len -= 1;
461 if self.len == 0 {
462 self.min_deadline.set(None);
463 } else if let Some(cur) = self.min_deadline.get() {
464 if cancelled_deadline == cur {
465 self.min_deadline.set(None);
466 }
467 }
468 self.slab.free(unsafe { Slot::from_raw(ptr) });
470 value
471 } else {
472 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
475 self.slab.free(unsafe { Slot::from_raw(ptr) });
477 None
478 }
479 }
480
481 pub fn free(&mut self, handle: TimerHandle<T>) {
489 let ptr = handle.ptr;
490 mem::forget(handle);
491
492 let entry = unsafe { entry_ref(ptr) };
494 let new_refs = entry.dec_refs();
495
496 if new_refs == 0 {
497 self.slab.free(unsafe { Slot::from_raw(ptr) });
500 }
501 }
503
504 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
516 let ptr = handle.ptr;
517 mem::forget(handle);
518
519 let entry = unsafe { entry_ref(ptr) };
521 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
522
523 let old_deadline = entry.deadline_ticks();
524
525 self.remove_entry(ptr);
527
528 let new_ticks = self.instant_to_ticks(new_deadline);
530 entry.set_deadline_ticks(new_ticks);
531 self.insert_entry(ptr, new_ticks);
532
533 if let Some(cur) = self.min_deadline.get() {
538 if old_deadline == cur && new_ticks > cur {
539 self.min_deadline.set(None);
540 }
541 }
542
543 TimerHandle::new(ptr)
544 }
545
546 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
550 self.poll_with_limit(now, usize::MAX, buf)
551 }
552
553 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
560 let now_ticks = self.instant_to_ticks(now);
561 self.current_ticks = now_ticks;
562
563 let mut fired = 0;
564 let mut mask = self.active_levels;
565
566 while mask != 0 && fired < limit {
567 let lvl_idx = mask.trailing_zeros() as usize;
568 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
570 }
571 fired
572 }
573
574 pub fn next_deadline(&self) -> Option<Instant> {
579 if let Some(ticks) = self.min_deadline.get() {
580 return Some(self.ticks_to_instant(ticks));
581 }
582 if self.len == 0 {
583 return None;
584 }
585 let min_ticks = self.walk_min_deadline();
586 self.min_deadline.set(min_ticks);
587 min_ticks.map(|t| self.ticks_to_instant(t))
588 }
589
590 #[cold]
591 fn walk_min_deadline(&self) -> Option<u64> {
592 let mut min_ticks: Option<u64> = None;
593 let mut lvl_mask = self.active_levels;
594 while lvl_mask != 0 {
595 let lvl_idx = lvl_mask.trailing_zeros() as usize;
596 lvl_mask &= lvl_mask - 1;
597 let level = &self.levels[lvl_idx];
598 let mut slot_mask = level.active_slots();
599 while slot_mask != 0 {
600 let slot_idx = slot_mask.trailing_zeros() as usize;
601 slot_mask &= slot_mask - 1;
602 let slot = level.slot(slot_idx);
603 let mut entry_ptr = slot.entry_head();
604 while !entry_ptr.is_null() {
605 let entry = unsafe { entry_ref(entry_ptr) };
607 let dt = entry.deadline_ticks();
608 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
609 entry_ptr = entry.next();
610 }
611 }
612 }
613 min_ticks
614 }
615
616 #[cfg(test)]
617 fn next_deadline_uncached(&self) -> Option<Instant> {
618 self.walk_min_deadline().map(|t| self.ticks_to_instant(t))
619 }
620
621 #[inline]
623 pub fn len(&self) -> usize {
624 self.len
625 }
626
627 #[inline]
629 pub fn is_empty(&self) -> bool {
630 self.len == 0
631 }
632
633 #[inline]
638 fn instant_to_ticks(&self, instant: Instant) -> u64 {
639 let dur = instant.saturating_duration_since(self.epoch);
640 let nanos = dur.as_nanos().min(u64::MAX as u128) as u64;
641 ((nanos as u128 * self.inv_tick_ns) >> 64) as u64
642 }
643
644 #[inline]
645 fn ticks_to_instant(&self, ticks: u64) -> Instant {
646 self.epoch + Duration::from_nanos(ticks.saturating_mul(self.tick_ns))
647 }
648
649 #[inline]
659 fn select_level(&self, deadline_ticks: u64) -> usize {
660 let delta = deadline_ticks.saturating_sub(self.current_ticks);
661
662 for (i, level) in self.levels.iter().enumerate() {
663 if delta < level.range() {
664 return i;
665 }
666 }
667
668 self.num_levels - 1
670 }
671
672 #[inline]
681 #[allow(clippy::needless_pass_by_ref_mut)]
682 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
683 let lvl_idx = self.select_level(deadline_ticks);
684 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
685
686 let entry = unsafe { entry_ref(entry_ptr) };
689 entry.set_location(lvl_idx as u8, slot_idx as u16);
690
691 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
693
694 self.levels[lvl_idx].activate_slot(slot_idx);
696 self.active_levels |= 1 << lvl_idx;
697
698 match self.min_deadline.get() {
702 None if self.len == 0 => {
703 self.min_deadline.set(Some(deadline_ticks));
704 }
705 Some(cur) if deadline_ticks < cur => {
706 self.min_deadline.set(Some(deadline_ticks));
707 }
708 _ => {}
709 }
710 }
711
712 #[inline]
718 #[allow(clippy::needless_pass_by_ref_mut)]
719 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
720 let entry = unsafe { entry_ref(entry_ptr) };
722
723 let lvl_idx = entry.level() as usize;
724 let slot_idx = entry.slot_idx() as usize;
725
726 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
728
729 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
730 self.levels[lvl_idx].deactivate_slot(slot_idx);
731 if !self.levels[lvl_idx].is_active() {
732 self.active_levels &= !(1 << lvl_idx);
733 }
734 }
735 }
736
737 #[inline]
745 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
746 let entry = unsafe { entry_ref(entry_ptr) };
748
749 let fired_deadline = entry.deadline_ticks();
750
751 let value = unsafe { entry.take_value() };
754
755 let new_refs = entry.dec_refs();
756 if new_refs == 0 {
757 self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
760 }
761 self.len -= 1;
765
766 if self.len == 0 {
767 self.min_deadline.set(None);
768 } else if let Some(cur) = self.min_deadline.get() {
769 if fired_deadline == cur {
770 self.min_deadline.set(None);
771 }
772 }
773
774 value
775 }
776
777 fn poll_level(
784 &mut self,
785 lvl_idx: usize,
786 now_ticks: u64,
787 limit: usize,
788 buf: &mut Vec<T>,
789 ) -> usize {
790 let mut fired = 0;
791 let mut mask = self.levels[lvl_idx].active_slots();
792
793 while mask != 0 && fired < limit {
794 let slot_idx = mask.trailing_zeros() as usize;
795 mask &= mask - 1;
796
797 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
798 let slot = unsafe { &*slot_ptr };
802 let mut entry_ptr = slot.entry_head();
803
804 while !entry_ptr.is_null() && fired < limit {
805 let entry = unsafe { entry_ref(entry_ptr) };
807 let next_entry = entry.next();
808
809 if entry.deadline_ticks() <= now_ticks {
810 unsafe { slot.remove_entry(entry_ptr) };
811
812 if let Some(value) = self.fire_entry(entry_ptr) {
813 buf.push(value);
814 }
815 fired += 1;
816 }
817
818 entry_ptr = next_entry;
819 }
820
821 if slot.is_empty() {
822 self.levels[lvl_idx].deactivate_slot(slot_idx);
823 }
824 }
825
826 if !self.levels[lvl_idx].is_active() {
828 self.active_levels &= !(1 << lvl_idx);
829 }
830
831 fired
832 }
833}
834
835impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
840 fn drop(&mut self) {
841 let mut lvl_mask = self.active_levels;
843 while lvl_mask != 0 {
844 let lvl_idx = lvl_mask.trailing_zeros() as usize;
845 lvl_mask &= lvl_mask - 1;
846
847 let level = &self.levels[lvl_idx];
848 let mut slot_mask = level.active_slots();
849 while slot_mask != 0 {
850 let slot_idx = slot_mask.trailing_zeros() as usize;
851 slot_mask &= slot_mask - 1;
852
853 let slot = level.slot(slot_idx);
854 let mut entry_ptr = slot.entry_head();
855 while !entry_ptr.is_null() {
856 let entry = unsafe { entry_ref(entry_ptr) };
858 let next_entry = entry.next();
859
860 self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
862
863 entry_ptr = next_entry;
864 }
865 }
866 }
867 }
868}
869
870#[cfg(test)]
875mod tests {
876 use super::*;
877 use std::time::{Duration, Instant};
878
879 fn ms(millis: u64) -> Duration {
880 Duration::from_millis(millis)
881 }
882
883 fn assert_send<T: Send>() {}
888
889 #[test]
890 fn wheel_is_send() {
891 assert_send::<Wheel<u64>>();
892 assert_send::<BoundedWheel<u64>>();
893 }
894
895 #[test]
900 fn default_config() {
901 let now = Instant::now();
902 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
903 assert!(wheel.is_empty());
904 assert_eq!(wheel.len(), 0);
905 }
906
907 #[test]
908 fn bounded_construction() {
909 let now = Instant::now();
910 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
911 assert!(wheel.is_empty());
912 }
913
914 #[test]
915 #[should_panic(expected = "slots_per_level must be a power of 2")]
916 fn invalid_config_non_power_of_two() {
917 let now = Instant::now();
918 WheelBuilder::default()
919 .slots_per_level(65)
920 .unbounded(1024)
921 .build::<u64>(now);
922 }
923
924 #[test]
929 fn schedule_and_cancel() {
930 let now = Instant::now();
931 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
932
933 let h = wheel.schedule(now + ms(50), 42);
934 assert_eq!(wheel.len(), 1);
935
936 let val = wheel.cancel(h);
937 assert_eq!(val, Some(42));
938 assert_eq!(wheel.len(), 0);
939 }
940
941 #[test]
942 fn schedule_forget_fires() {
943 let now = Instant::now();
944 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
945
946 wheel.schedule_forget(now + ms(10), 99);
947 assert_eq!(wheel.len(), 1);
948
949 let mut buf = Vec::new();
950 let fired = wheel.poll(now + ms(20), &mut buf);
951 assert_eq!(fired, 1);
952 assert_eq!(buf, vec![99]);
953 assert_eq!(wheel.len(), 0);
954 }
955
956 #[test]
957 fn cancel_after_fire_returns_none() {
958 let now = Instant::now();
959 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
960
961 let h = wheel.schedule(now + ms(10), 42);
962
963 let mut buf = Vec::new();
964 wheel.poll(now + ms(20), &mut buf);
965 assert_eq!(buf, vec![42]);
966
967 let val = wheel.cancel(h);
969 assert_eq!(val, None);
970 }
971
972 #[test]
973 fn free_active_timer_becomes_fire_and_forget() {
974 let now = Instant::now();
975 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
976
977 let h = wheel.schedule(now + ms(10), 42);
978 wheel.free(h); assert_eq!(wheel.len(), 1);
980
981 let mut buf = Vec::new();
982 wheel.poll(now + ms(20), &mut buf);
983 assert_eq!(buf, vec![42]);
984 assert_eq!(wheel.len(), 0);
985 }
986
987 #[test]
988 fn free_zombie_handle() {
989 let now = Instant::now();
990 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
991
992 let h = wheel.schedule(now + ms(10), 42);
993
994 let mut buf = Vec::new();
995 wheel.poll(now + ms(20), &mut buf);
996
997 wheel.free(h);
999 }
1000
1001 #[test]
1006 fn bounded_full() {
1007 let now = Instant::now();
1008 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
1009
1010 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
1011 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
1012
1013 let err = wheel.try_schedule(now + ms(30), 3);
1014 assert!(err.is_err());
1015 let recovered = err.unwrap_err().into_inner();
1016 assert_eq!(recovered, 3);
1017
1018 wheel.cancel(h1);
1020 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
1021
1022 wheel.free(h2);
1024 wheel.free(h3);
1025 }
1026
1027 #[test]
1028 fn bounded_schedule_forget_full() {
1029 let now = Instant::now();
1030 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
1031
1032 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
1033 let err = wheel.try_schedule_forget(now + ms(20), 2);
1034 assert!(err.is_err());
1035 }
1036
1037 #[test]
1042 fn poll_respects_deadline() {
1043 let now = Instant::now();
1044 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1045
1046 wheel.schedule_forget(now + ms(10), 1);
1047 wheel.schedule_forget(now + ms(50), 2);
1048 wheel.schedule_forget(now + ms(100), 3);
1049
1050 let mut buf = Vec::new();
1051
1052 let fired = wheel.poll(now + ms(20), &mut buf);
1054 assert_eq!(fired, 1);
1055 assert_eq!(buf, vec![1]);
1056 assert_eq!(wheel.len(), 2);
1057
1058 buf.clear();
1060 let fired = wheel.poll(now + ms(60), &mut buf);
1061 assert_eq!(fired, 1);
1062 assert_eq!(buf, vec![2]);
1063
1064 buf.clear();
1066 let fired = wheel.poll(now + ms(200), &mut buf);
1067 assert_eq!(fired, 1);
1068 assert_eq!(buf, vec![3]);
1069
1070 assert!(wheel.is_empty());
1071 }
1072
1073 #[test]
1074 fn poll_with_limit() {
1075 let now = Instant::now();
1076 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1077
1078 for i in 0..10 {
1079 wheel.schedule_forget(now + ms(1), i);
1080 }
1081
1082 let mut buf = Vec::new();
1083
1084 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1086 assert_eq!(fired, 3);
1087 assert_eq!(wheel.len(), 7);
1088
1089 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1090 assert_eq!(fired, 3);
1091 assert_eq!(wheel.len(), 4);
1092
1093 let fired = wheel.poll(now + ms(5), &mut buf);
1095 assert_eq!(fired, 4);
1096 assert!(wheel.is_empty());
1097 assert_eq!(buf.len(), 10);
1098 }
1099
1100 #[test]
1105 fn timers_across_levels() {
1106 let now = Instant::now();
1107 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1108
1109 wheel.schedule_forget(now + ms(5), 0);
1111 wheel.schedule_forget(now + ms(200), 1);
1113 wheel.schedule_forget(now + ms(1000), 2);
1115
1116 let mut buf = Vec::new();
1117
1118 wheel.poll(now + ms(10), &mut buf);
1119 assert_eq!(buf, vec![0]);
1120
1121 buf.clear();
1122 wheel.poll(now + ms(250), &mut buf);
1123 assert_eq!(buf, vec![1]);
1124
1125 buf.clear();
1126 wheel.poll(now + ms(1500), &mut buf);
1127 assert_eq!(buf, vec![2]);
1128
1129 assert!(wheel.is_empty());
1130 }
1131
1132 #[test]
1137 fn next_deadline_empty() {
1138 let now = Instant::now();
1139 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1140 assert!(wheel.next_deadline().is_none());
1141 }
1142
1143 #[test]
1144 fn next_deadline_returns_earliest() {
1145 let now = Instant::now();
1146 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1147
1148 wheel.schedule_forget(now + ms(100), 1);
1149 wheel.schedule_forget(now + ms(50), 2);
1150 wheel.schedule_forget(now + ms(200), 3);
1151
1152 let next = wheel.next_deadline().unwrap();
1153 let delta = next.duration_since(now);
1155 assert!(delta >= ms(49) && delta <= ms(51));
1156 }
1157
1158 #[test]
1159 fn next_deadline_cache_invalidates_on_cancel() {
1160 let now = Instant::now();
1161 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1162
1163 let h1 = wheel.schedule(now + ms(50), 1);
1164 wheel.schedule_forget(now + ms(100), 2);
1165 wheel.schedule_forget(now + ms(200), 3);
1166
1167 let d1 = wheel.next_deadline().unwrap();
1169 assert!(d1.duration_since(now) >= ms(49) && d1.duration_since(now) <= ms(51));
1170
1171 wheel.cancel(h1);
1173 let d2 = wheel.next_deadline().unwrap();
1174 assert!(d2.duration_since(now) >= ms(99) && d2.duration_since(now) <= ms(101));
1175 }
1176
1177 #[test]
1178 fn next_deadline_cache_invalidates_on_fire() {
1179 let now = Instant::now();
1180 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1181
1182 wheel.schedule_forget(now + ms(50), 1);
1183 wheel.schedule_forget(now + ms(100), 2);
1184 wheel.schedule_forget(now + ms(200), 3);
1185
1186 let _ = wheel.next_deadline();
1188
1189 let mut buf = Vec::new();
1191 wheel.poll(now + ms(60), &mut buf);
1192 assert_eq!(buf, vec![1]);
1193
1194 let d = wheel.next_deadline().unwrap();
1196 assert!(d.duration_since(now) >= ms(99) && d.duration_since(now) <= ms(101));
1197 }
1198
1199 #[test]
1200 fn next_deadline_cache_updates_on_insert() {
1201 let now = Instant::now();
1202 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1203
1204 wheel.schedule_forget(now + ms(100), 1);
1205 let d1 = wheel.next_deadline().unwrap();
1206 assert!(d1.duration_since(now) >= ms(99) && d1.duration_since(now) <= ms(101));
1207
1208 wheel.schedule_forget(now + ms(30), 2);
1210 let d2 = wheel.next_deadline().unwrap();
1211 assert!(d2.duration_since(now) >= ms(29) && d2.duration_since(now) <= ms(31));
1212 }
1213
1214 #[test]
1215 fn next_deadline_after_reschedule_later() {
1216 let now = Instant::now();
1217 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1218
1219 let h_a = wheel.schedule(now + ms(50), 1);
1220 wheel.schedule_forget(now + ms(100), 2);
1221
1222 let d1 = wheel.next_deadline().unwrap();
1223 assert!(d1.duration_since(now) >= ms(49) && d1.duration_since(now) <= ms(51));
1224
1225 let h_a = wheel.reschedule(h_a, now + ms(200));
1227 let d2 = wheel.next_deadline().unwrap();
1228 assert!(d2.duration_since(now) >= ms(99) && d2.duration_since(now) <= ms(101));
1229 mem::forget(h_a);
1230 }
1231
1232 #[test]
1233 fn next_deadline_after_reschedule_earlier() {
1234 let now = Instant::now();
1235 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1236
1237 let h_a = wheel.schedule(now + ms(100), 1);
1238 wheel.schedule_forget(now + ms(50), 2);
1239
1240 let d1 = wheel.next_deadline().unwrap();
1241 assert!(d1.duration_since(now) >= ms(49) && d1.duration_since(now) <= ms(51));
1242
1243 let h_a = wheel.reschedule(h_a, now + ms(25));
1245 let d2 = wheel.next_deadline().unwrap();
1246 assert!(d2.duration_since(now) >= ms(24) && d2.duration_since(now) <= ms(26));
1247 mem::forget(h_a);
1248 }
1249
1250 #[test]
1251 fn next_deadline_repeated_calls_stable() {
1252 let now = Instant::now();
1253 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1254
1255 wheel.schedule_forget(now + ms(50), 1);
1256 wheel.schedule_forget(now + ms(100), 2);
1257 wheel.schedule_forget(now + ms(200), 3);
1258
1259 let d1 = wheel.next_deadline();
1260 let d2 = wheel.next_deadline();
1261 let d3 = wheel.next_deadline();
1262 assert_eq!(d1, d2);
1263 assert_eq!(d2, d3);
1264 }
1265
1266 #[test]
1271 fn reciprocal_instant_to_ticks_precision() {
1272 let now = Instant::now();
1275
1276 for &tick_ns in &[1_000_000u64, 1_000, 100, 999_999, 7_500_000] {
1277 let wheel: Wheel<u64> = WheelBuilder::default()
1278 .tick_duration(Duration::from_nanos(tick_ns))
1279 .unbounded(64)
1280 .build(now);
1281
1282 for n in 0..500u64 {
1283 let nanos = n * tick_ns;
1284 let instant = now + Duration::from_nanos(nanos);
1285 let got = wheel.instant_to_ticks(instant);
1286 let exact = nanos / tick_ns;
1287 let diff = exact as i64 - got as i64;
1288 assert!(
1289 diff >= 0 && diff <= 1,
1290 "tick_ns={tick_ns}, n={n}: exact={exact}, got={got}, diff={diff}",
1291 );
1292 }
1293 }
1294 }
1295
1296 #[test]
1301 fn deadline_in_the_past_fires_immediately() {
1302 let now = Instant::now();
1303 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1304
1305 wheel.schedule_forget(now, 42);
1307
1308 let mut buf = Vec::new();
1309 let fired = wheel.poll(now + ms(1), &mut buf);
1310 assert_eq!(fired, 1);
1311 assert_eq!(buf, vec![42]);
1312 }
1313
1314 #[test]
1319 fn deadline_beyond_max_range_clamped() {
1320 let now = Instant::now();
1321 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1322
1323 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1325 assert_eq!(wheel.len(), 1);
1326
1327 let mut buf = Vec::new();
1329 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1330 assert_eq!(buf, vec![99]);
1331
1332 let val = wheel.cancel(h);
1342 assert_eq!(val, None);
1343 }
1344
1345 #[test]
1350 fn drop_cleans_up_active_entries() {
1351 let now = Instant::now();
1352 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1353
1354 for i in 0..100 {
1355 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1356 }
1357
1358 assert_eq!(wheel.len(), 100);
1359 drop(wheel);
1361 }
1362
1363 #[test]
1364 fn drop_with_outstanding_handles() {
1365 let now = Instant::now();
1366 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1367
1368 let h1 = wheel.schedule(now + ms(10), 1);
1370 let h2 = wheel.schedule(now + ms(20), 2);
1371
1372 wheel.free(h1);
1374 wheel.free(h2);
1375
1376 drop(wheel);
1378 }
1379
1380 #[test]
1385 fn level_selection_boundaries() {
1386 let now = Instant::now();
1387 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1388
1389 assert_eq!(wheel.select_level(0), 0);
1391 assert_eq!(wheel.select_level(63), 0);
1392
1393 assert_eq!(wheel.select_level(64), 1);
1395 assert_eq!(wheel.select_level(511), 1);
1396
1397 assert_eq!(wheel.select_level(512), 2);
1399 }
1400
1401 #[test]
1406 fn cancel_after_time_advance() {
1407 let now = Instant::now();
1412 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1413
1414 let h = wheel.schedule(now + ms(500), 42);
1415 assert_eq!(wheel.len(), 1);
1416
1417 let mut buf = Vec::new();
1419 let fired = wheel.poll(now + ms(400), &mut buf);
1420 assert_eq!(fired, 0);
1421 assert!(buf.is_empty());
1422
1423 let val = wheel.cancel(h);
1425 assert_eq!(val, Some(42));
1426 assert_eq!(wheel.len(), 0);
1427 }
1428
1429 #[test]
1434 fn multiple_entries_same_slot() {
1435 let now = Instant::now();
1436 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1437
1438 let mut handles = Vec::new();
1440 for i in 0..5 {
1441 handles.push(wheel.schedule(now + ms(10), i));
1442 }
1443 assert_eq!(wheel.len(), 5);
1444
1445 let v2 = wheel.cancel(handles.remove(2));
1447 assert_eq!(v2, Some(2));
1448 let v0 = wheel.cancel(handles.remove(0));
1449 assert_eq!(v0, Some(0));
1450 assert_eq!(wheel.len(), 3);
1451
1452 let mut buf = Vec::new();
1454 let fired = wheel.poll(now + ms(20), &mut buf);
1455 assert_eq!(fired, 3);
1456
1457 for h in handles {
1459 let val = wheel.cancel(h);
1460 assert_eq!(val, None); }
1462 }
1463
1464 #[test]
1469 fn entry_at_level_boundary() {
1470 let now = Instant::now();
1473 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1474
1475 let h = wheel.schedule(now + ms(64), 99);
1476 assert_eq!(wheel.len(), 1);
1477
1478 let mut buf = Vec::new();
1480 let fired = wheel.poll(now + ms(63), &mut buf);
1481 assert_eq!(fired, 0);
1482
1483 let fired = wheel.poll(now + ms(65), &mut buf);
1485 assert_eq!(fired, 1);
1486 assert_eq!(buf, vec![99]);
1487
1488 wheel.cancel(h);
1490 }
1491
1492 #[test]
1497 fn poll_with_limit_mixed_expiry() {
1498 let now = Instant::now();
1499 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1500
1501 wheel.schedule_forget(now + ms(5), 1);
1503 wheel.schedule_forget(now + ms(5), 2);
1504 wheel.schedule_forget(now + ms(5), 3);
1505 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1508
1509 let mut buf = Vec::new();
1510
1511 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1513 assert_eq!(fired, 2);
1514 assert_eq!(wheel.len(), 3);
1515
1516 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1518 assert_eq!(fired, 1);
1519 assert_eq!(wheel.len(), 2);
1520
1521 assert_eq!(buf.len(), 3);
1523 }
1524
1525 #[test]
1530 fn reuse_after_full_drain() {
1531 let now = Instant::now();
1532 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1533
1534 for i in 0..10 {
1536 wheel.schedule_forget(now + ms(1), i);
1537 }
1538 let mut buf = Vec::new();
1539 wheel.poll(now + ms(5), &mut buf);
1540 assert_eq!(buf.len(), 10);
1541 assert!(wheel.is_empty());
1542
1543 buf.clear();
1545 for i in 10..20 {
1546 wheel.schedule_forget(now + ms(100), i);
1547 }
1548 assert_eq!(wheel.len(), 10);
1549
1550 wheel.poll(now + ms(200), &mut buf);
1551 assert_eq!(buf.len(), 10);
1552 assert!(wheel.is_empty());
1553 }
1554
1555 #[test]
1560 fn all_levels_active() {
1561 let now = Instant::now();
1562 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1563
1564 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1567 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1568 for (i, &d) in distances.iter().enumerate() {
1569 handles.push(wheel.schedule(now + ms(d), i as u64));
1570 }
1571 assert_eq!(wheel.len(), 7);
1572
1573 let order = [4, 1, 6, 0, 3, 5, 2];
1575 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1578 handles.into_iter().map(Some).collect();
1579
1580 for &idx in &order {
1581 let h = opt_handles[idx].take().unwrap();
1582 let val = wheel.cancel(h);
1583 assert_eq!(val, Some(idx as u64));
1584 }
1585 assert!(wheel.is_empty());
1586 }
1587
1588 #[test]
1593 fn poll_values_match() {
1594 let now = Instant::now();
1595 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1596
1597 let expected: Vec<u64> = (100..110).collect();
1598 for &v in &expected {
1599 wheel.schedule_forget(now + ms(5), v);
1600 }
1601
1602 let mut buf = Vec::new();
1603 wheel.poll(now + ms(10), &mut buf);
1604
1605 buf.sort_unstable();
1606 assert_eq!(buf, expected);
1607 }
1608
1609 #[test]
1614 fn reschedule_moves_deadline() {
1615 let now = Instant::now();
1616 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1617
1618 let h = wheel.schedule(now + ms(100), 42);
1619 assert_eq!(wheel.len(), 1);
1620
1621 let h = wheel.reschedule(h, now + ms(50));
1623 assert_eq!(wheel.len(), 1);
1624
1625 let mut buf = Vec::new();
1627 let fired = wheel.poll(now + ms(40), &mut buf);
1628 assert_eq!(fired, 0);
1629
1630 let fired = wheel.poll(now + ms(55), &mut buf);
1632 assert_eq!(fired, 1);
1633 assert_eq!(buf, vec![42]);
1634
1635 wheel.cancel(h);
1637 }
1638
1639 #[test]
1640 fn reschedule_to_later() {
1641 let now = Instant::now();
1642 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1643
1644 let h = wheel.schedule(now + ms(50), 7);
1645
1646 let h = wheel.reschedule(h, now + ms(200));
1648
1649 let mut buf = Vec::new();
1651 let fired = wheel.poll(now + ms(60), &mut buf);
1652 assert_eq!(fired, 0);
1653
1654 let fired = wheel.poll(now + ms(210), &mut buf);
1656 assert_eq!(fired, 1);
1657 assert_eq!(buf, vec![7]);
1658
1659 wheel.cancel(h);
1660 }
1661
1662 #[test]
1663 #[should_panic(expected = "cannot reschedule a fired timer")]
1664 fn reschedule_panics_on_zombie() {
1665 let now = Instant::now();
1666 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1667
1668 let h = wheel.schedule(now + ms(10), 42);
1669
1670 let mut buf = Vec::new();
1671 wheel.poll(now + ms(20), &mut buf);
1672
1673 let _h = wheel.reschedule(h, now + ms(100));
1675 }
1676
1677 #[test]
1682 fn custom_slots_per_level() {
1683 let now = Instant::now();
1684 let mut wheel: Wheel<u64> = WheelBuilder::default()
1686 .slots_per_level(32)
1687 .unbounded(256)
1688 .build(now);
1689
1690 let h1 = wheel.schedule(now + ms(20), 1);
1693 let h2 = wheel.schedule(now + ms(40), 2);
1695
1696 let mut buf = Vec::new();
1697 wheel.poll(now + ms(25), &mut buf);
1698 assert_eq!(buf, vec![1]);
1699
1700 buf.clear();
1701 wheel.poll(now + ms(50), &mut buf);
1702 assert_eq!(buf, vec![2]);
1703
1704 wheel.cancel(h1);
1705 wheel.cancel(h2);
1706 }
1707
1708 #[test]
1709 fn custom_clk_shift() {
1710 let now = Instant::now();
1711 let mut wheel: Wheel<u64> = WheelBuilder::default()
1713 .clk_shift(2)
1714 .unbounded(256)
1715 .build(now);
1716
1717 let h1 = wheel.schedule(now + ms(50), 1); let h2 = wheel.schedule(now + ms(100), 2); let mut buf = Vec::new();
1723 wheel.poll(now + ms(55), &mut buf);
1724 assert_eq!(buf, vec![1]);
1725
1726 buf.clear();
1727 wheel.poll(now + ms(110), &mut buf);
1728 assert_eq!(buf, vec![2]);
1729
1730 wheel.cancel(h1);
1731 wheel.cancel(h2);
1732 }
1733
1734 #[test]
1735 fn custom_num_levels() {
1736 let now = Instant::now();
1737 let mut wheel: Wheel<u64> = WheelBuilder::default()
1739 .num_levels(3)
1740 .unbounded(256)
1741 .build(now);
1742
1743 let h = wheel.schedule(now + ms(3000), 42);
1746 assert_eq!(wheel.len(), 1);
1747
1748 let mut buf = Vec::new();
1749 wheel.poll(now + ms(3100), &mut buf);
1750 assert_eq!(buf, vec![42]);
1751
1752 wheel.cancel(h);
1753 }
1754
1755 #[test]
1756 fn custom_tick_duration() {
1757 let now = Instant::now();
1758 let mut wheel: Wheel<u64> = WheelBuilder::default()
1760 .tick_duration(Duration::from_micros(100))
1761 .unbounded(256)
1762 .build(now);
1763
1764 wheel.schedule_forget(now + ms(1), 1);
1766 wheel.schedule_forget(now + ms(10), 2);
1768
1769 let mut buf = Vec::new();
1770 wheel.poll(now + ms(2), &mut buf);
1771 assert_eq!(buf, vec![1]);
1772
1773 buf.clear();
1774 wheel.poll(now + ms(15), &mut buf);
1775 assert_eq!(buf, vec![2]);
1776 }
1777
1778 #[test]
1779 fn bounded_custom_config() {
1780 let now = Instant::now();
1781 let mut wheel: BoundedWheel<u64> = WheelBuilder::default()
1782 .slots_per_level(16)
1783 .num_levels(4)
1784 .bounded(8)
1785 .build(now);
1786
1787 let mut handles = Vec::new();
1789 for i in 0..8 {
1790 handles.push(wheel.try_schedule(now + ms(i * 10 + 10), i).unwrap());
1791 }
1792 assert!(wheel.try_schedule(now + ms(100), 99).is_err());
1793
1794 wheel.cancel(handles.remove(0));
1796 let h = wheel.try_schedule(now + ms(100), 99).unwrap();
1797 handles.push(h);
1798
1799 for h in handles {
1801 wheel.cancel(h);
1802 }
1803 }
1804
1805 #[test]
1810 #[should_panic(expected = "slots_per_level must be <= 64")]
1811 fn invalid_config_too_many_slots() {
1812 let now = Instant::now();
1813 WheelBuilder::default()
1814 .slots_per_level(128)
1815 .unbounded(1024)
1816 .build::<u64>(now);
1817 }
1818
1819 #[test]
1820 #[should_panic(expected = "num_levels must be > 0")]
1821 fn invalid_config_zero_levels() {
1822 let now = Instant::now();
1823 WheelBuilder::default()
1824 .num_levels(0)
1825 .unbounded(1024)
1826 .build::<u64>(now);
1827 }
1828
1829 #[test]
1830 #[should_panic(expected = "num_levels must be <= 8")]
1831 fn invalid_config_too_many_levels() {
1832 let now = Instant::now();
1833 WheelBuilder::default()
1834 .num_levels(9)
1835 .unbounded(1024)
1836 .build::<u64>(now);
1837 }
1838
1839 #[test]
1840 #[should_panic(expected = "clk_shift must be > 0")]
1841 fn invalid_config_zero_shift() {
1842 let now = Instant::now();
1843 WheelBuilder::default()
1844 .clk_shift(0)
1845 .unbounded(1024)
1846 .build::<u64>(now);
1847 }
1848
1849 #[test]
1850 #[should_panic(expected = "tick_duration must be non-zero")]
1851 fn invalid_config_zero_tick() {
1852 let now = Instant::now();
1853 WheelBuilder::default()
1854 .tick_duration(Duration::ZERO)
1855 .unbounded(1024)
1856 .build::<u64>(now);
1857 }
1858
1859 #[test]
1860 #[should_panic(expected = "overflow")]
1861 fn invalid_config_shift_overflow() {
1862 let now = Instant::now();
1863 WheelBuilder::default()
1867 .num_levels(8)
1868 .clk_shift(9)
1869 .unbounded(1024)
1870 .build::<u64>(now);
1871 }
1872
1873 #[test]
1880 fn miri_schedule_cancel_drop_type() {
1881 let now = Instant::now();
1882 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1883
1884 let h = wheel.schedule(now + ms(50), "hello".to_string());
1885 let val = wheel.cancel(h);
1886 assert_eq!(val, Some("hello".to_string()));
1887 assert!(wheel.is_empty());
1888 }
1889
1890 #[test]
1891 fn miri_poll_fires_drop_type() {
1892 let now = Instant::now();
1893 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1894
1895 wheel.schedule_forget(now + ms(10), "a".to_string());
1896 wheel.schedule_forget(now + ms(10), "b".to_string());
1897 wheel.schedule_forget(now + ms(10), "c".to_string());
1898
1899 let mut buf = Vec::new();
1900 let fired = wheel.poll(now + ms(20), &mut buf);
1901 assert_eq!(fired, 3);
1902 assert_eq!(buf.len(), 3);
1903 assert!(wheel.is_empty());
1904 }
1905
1906 #[test]
1907 fn miri_cancel_zombie_drop_type() {
1908 let now = Instant::now();
1909 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1910
1911 let h = wheel.schedule(now + ms(10), "zombie".to_string());
1912
1913 let mut buf = Vec::new();
1914 wheel.poll(now + ms(20), &mut buf);
1915 assert_eq!(buf, vec!["zombie".to_string()]);
1916
1917 let val = wheel.cancel(h);
1919 assert_eq!(val, None);
1920 }
1921
1922 #[test]
1923 fn miri_free_active_and_zombie() {
1924 let now = Instant::now();
1925 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1926
1927 let h1 = wheel.schedule(now + ms(10), "active".to_string());
1929 wheel.free(h1);
1930
1931 let mut buf = Vec::new();
1933 wheel.poll(now + ms(20), &mut buf);
1934 assert_eq!(buf, vec!["active".to_string()]);
1935
1936 let h2 = wheel.schedule(now + ms(10), "will-fire".to_string());
1938 buf.clear();
1939 wheel.poll(now + ms(20), &mut buf);
1940 wheel.free(h2); }
1942
1943 #[test]
1944 fn miri_reschedule_drop_type() {
1945 let now = Instant::now();
1946 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1947
1948 let h = wheel.schedule(now + ms(100), "moveme".to_string());
1949 let h = wheel.reschedule(h, now + ms(50));
1950
1951 let mut buf = Vec::new();
1952 wheel.poll(now + ms(55), &mut buf);
1953 assert_eq!(buf, vec!["moveme".to_string()]);
1954
1955 wheel.cancel(h);
1956 }
1957
1958 #[test]
1959 fn miri_dll_multi_entry_same_slot() {
1960 let now = Instant::now();
1962 let mut wheel: Wheel<Vec<u8>> = Wheel::unbounded(64, now);
1963
1964 let mut handles = Vec::new();
1965 for i in 0..5 {
1966 handles.push(wheel.schedule(now + ms(10), vec![i; 32]));
1967 }
1968
1969 let v2 = wheel.cancel(handles.remove(2));
1971 assert_eq!(v2.unwrap(), vec![2; 32]);
1972
1973 let v0 = wheel.cancel(handles.remove(0));
1974 assert_eq!(v0.unwrap(), vec![0; 32]);
1975
1976 let mut buf = Vec::new();
1978 wheel.poll(now + ms(20), &mut buf);
1979 assert_eq!(buf.len(), 3);
1980
1981 for h in handles {
1983 wheel.cancel(h);
1984 }
1985 }
1986
1987 #[test]
1988 fn miri_drop_wheel_with_entries() {
1989 let now = Instant::now();
1990 let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1991
1992 for i in 0..20 {
1994 wheel.schedule_forget(now + ms(i * 100), format!("entry-{i}"));
1995 }
1996 assert_eq!(wheel.len(), 20);
1997
1998 drop(wheel);
2000 }
2001
2002 #[test]
2003 fn miri_bounded_lifecycle() {
2004 let now = Instant::now();
2005 let mut wheel: BoundedWheel<String> = BoundedWheel::bounded(4, now);
2006
2007 let h1 = wheel.try_schedule(now + ms(10), "a".to_string()).unwrap();
2008 let h2 = wheel.try_schedule(now + ms(20), "b".to_string()).unwrap();
2009 let h3 = wheel.try_schedule(now + ms(30), "c".to_string()).unwrap();
2010 let h4 = wheel.try_schedule(now + ms(40), "d".to_string()).unwrap();
2011
2012 let err = wheel.try_schedule(now + ms(50), "e".to_string());
2014 assert!(err.is_err());
2015
2016 wheel.cancel(h1);
2018 let h5 = wheel.try_schedule(now + ms(50), "e".to_string()).unwrap();
2019
2020 let mut buf = Vec::new();
2022 wheel.poll(now + ms(25), &mut buf);
2023
2024 wheel.cancel(h2);
2026 wheel.free(h3);
2027 wheel.free(h4);
2028 wheel.free(h5);
2029 }
2030}
2031
2032#[cfg(test)]
2033mod proptests {
2034 use super::*;
2035 use proptest::prelude::*;
2036 use std::collections::HashSet;
2037 use std::mem;
2038 use std::time::{Duration, Instant};
2039
2040 #[derive(Debug, Clone)]
2042 enum Op {
2043 Schedule { deadline_ms: u64 },
2045 Cancel { idx: usize },
2047 }
2048
2049 fn op_strategy() -> impl Strategy<Value = Op> {
2050 prop_oneof![
2051 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
2053 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
2055 ]
2056 }
2057
2058 proptest! {
2059 #![proptest_config(ProptestConfig::with_cases(500))]
2060
2061 #[test]
2068 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
2069 let now = Instant::now();
2070 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
2071
2072 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
2073 let mut active_values: HashSet<u64> = HashSet::new();
2074 let mut next_id: u64 = 0;
2075
2076 for op in &ops {
2077 match op {
2078 Op::Schedule { deadline_ms } => {
2079 let id = next_id;
2080 next_id += 1;
2081 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
2082 handles.push(h);
2083 active_values.insert(id);
2084 }
2085 Op::Cancel { idx } => {
2086 if !handles.is_empty() {
2087 let i = idx % handles.len();
2088 let h = handles.swap_remove(i);
2089 let val = wheel.cancel(h);
2090 let v = val.unwrap();
2092 assert!(active_values.remove(&v));
2093 }
2094 }
2095 }
2096 prop_assert_eq!(wheel.len(), active_values.len());
2098 }
2099
2100 let mut buf = Vec::new();
2102 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
2104
2105 for h in handles {
2107 mem::forget(h);
2108 }
2109
2110 let fired_set: HashSet<u64> = buf.into_iter().collect();
2111 prop_assert_eq!(fired_set, active_values);
2112 prop_assert!(wheel.is_empty());
2113 }
2114
2115 #[test]
2121 fn fuzz_poll_timing(
2122 deadlines in proptest::collection::vec(1u64..5000, 1..100),
2123 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
2124 ) {
2125 let now = Instant::now();
2126 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
2127
2128 for (i, &d) in deadlines.iter().enumerate() {
2130 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
2131 }
2132
2133 let mut sorted_times: Vec<u64> = poll_times;
2135 sorted_times.sort_unstable();
2136 sorted_times.dedup();
2137
2138 let mut all_fired: Vec<u64> = Vec::new();
2139
2140 for &t in &sorted_times {
2141 let mut buf = Vec::new();
2142 wheel.poll(now + Duration::from_millis(t), &mut buf);
2143
2144 for &id in &buf {
2146 let deadline_ms = deadlines[id as usize];
2147 prop_assert!(deadline_ms <= t,
2148 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
2149 }
2150
2151 all_fired.extend(buf);
2152 }
2153
2154 let mut final_buf = Vec::new();
2156 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
2157 all_fired.extend(final_buf);
2158
2159 all_fired.sort_unstable();
2161 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
2162 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
2163 prop_assert!(wheel.is_empty());
2164 }
2165
2166 #[test]
2172 fn fuzz_next_deadline_cache(
2173 ops in proptest::collection::vec(deadline_op_strategy(), 1..300),
2174 ) {
2175 let now = Instant::now();
2176 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
2177 let mut handles: Vec<(TimerHandle<u64>, u64)> = Vec::new();
2178 let mut next_id: u64 = 0;
2179
2180 for op in &ops {
2181 match op {
2182 DeadlineOp::Schedule { deadline_ms } => {
2183 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), next_id);
2184 handles.push((h, *deadline_ms));
2185 next_id += 1;
2186 }
2187 DeadlineOp::ScheduleForget { deadline_ms } => {
2188 wheel.schedule_forget(now + Duration::from_millis(*deadline_ms), next_id);
2189 next_id += 1;
2190 }
2191 DeadlineOp::Cancel { idx } => {
2192 if !handles.is_empty() {
2193 let i = idx % handles.len();
2194 let (h, _) = handles.swap_remove(i);
2195 wheel.cancel(h);
2196 }
2197 }
2198 DeadlineOp::Reschedule { idx, new_deadline_ms } => {
2199 if !handles.is_empty() {
2200 let i = idx % handles.len();
2201 let (h, _) = handles.swap_remove(i);
2202 let new_h = wheel.reschedule(h, now + Duration::from_millis(*new_deadline_ms));
2203 handles.push((new_h, *new_deadline_ms));
2204 }
2205 }
2206 DeadlineOp::Poll { at_ms } => {
2207 let mut buf = Vec::new();
2208 wheel.poll(now + Duration::from_millis(*at_ms), &mut buf);
2209 let at = *at_ms;
2212 let mut i = 0;
2213 while i < handles.len() {
2214 if handles[i].1 <= at {
2215 let (h, _) = handles.swap_remove(i);
2216 wheel.free(h);
2217 } else {
2218 i += 1;
2219 }
2220 }
2221 }
2222 }
2223
2224 let cached = wheel.next_deadline();
2225 let uncached = wheel.next_deadline_uncached();
2226 prop_assert_eq!(cached, uncached, "cache disagrees with walk after {:?}", op);
2227 }
2228
2229 for (h, _) in handles {
2230 mem::forget(h);
2231 }
2232 }
2233 }
2234
2235 #[derive(Debug, Clone)]
2236 enum DeadlineOp {
2237 Schedule { deadline_ms: u64 },
2238 ScheduleForget { deadline_ms: u64 },
2239 Cancel { idx: usize },
2240 Reschedule { idx: usize, new_deadline_ms: u64 },
2241 Poll { at_ms: u64 },
2242 }
2243
2244 fn deadline_op_strategy() -> impl Strategy<Value = DeadlineOp> {
2245 prop_oneof![
2246 (1u64..10_000).prop_map(|deadline_ms| DeadlineOp::Schedule { deadline_ms }),
2247 (1u64..10_000).prop_map(|deadline_ms| DeadlineOp::ScheduleForget { deadline_ms }),
2248 any::<usize>().prop_map(|idx| DeadlineOp::Cancel { idx }),
2249 (any::<usize>(), 1u64..10_000).prop_map(|(idx, new_deadline_ms)| {
2250 DeadlineOp::Reschedule {
2251 idx,
2252 new_deadline_ms,
2253 }
2254 }),
2255 (1u64..10_000).prop_map(|at_ms| DeadlineOp::Poll { at_ms }),
2256 ]
2257 }
2258}