1use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48 tick_duration: Duration,
49 slots_per_level: usize,
50 clk_shift: u32,
51 num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55 fn default() -> Self {
56 WheelBuilder {
57 tick_duration: Duration::from_millis(1),
58 slots_per_level: 64,
59 clk_shift: 3,
60 num_levels: 7,
61 }
62 }
63}
64
65impl WheelBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn tick_duration(mut self, d: Duration) -> Self {
73 self.tick_duration = d;
74 self
75 }
76
77 pub fn slots_per_level(mut self, n: usize) -> Self {
79 self.slots_per_level = n;
80 self
81 }
82
83 pub fn clk_shift(mut self, s: u32) -> Self {
85 self.clk_shift = s;
86 self
87 }
88
89 pub fn num_levels(mut self, n: usize) -> Self {
91 self.num_levels = n;
92 self
93 }
94
95 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100 UnboundedWheelBuilder {
101 config: self,
102 chunk_capacity,
103 }
104 }
105
106 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110 BoundedWheelBuilder {
111 config: self,
112 capacity,
113 }
114 }
115
116 fn validate(&self) {
117 assert!(
118 self.slots_per_level.is_power_of_two(),
119 "slots_per_level must be a power of 2, got {}",
120 self.slots_per_level
121 );
122 assert!(
123 self.slots_per_level <= 64,
124 "slots_per_level must be <= 64 (u64 bitmask), got {}",
125 self.slots_per_level
126 );
127 assert!(self.num_levels > 0, "num_levels must be > 0");
128 assert!(
129 self.num_levels <= 8,
130 "num_levels must be <= 8 (u8 bitmask), got {}",
131 self.num_levels
132 );
133 assert!(self.clk_shift > 0, "clk_shift must be > 0");
134 assert!(
135 !self.tick_duration.is_zero(),
136 "tick_duration must be non-zero"
137 );
138 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139 assert!(
140 max_shift < 64,
141 "(num_levels - 1) * clk_shift must be < 64, got {}",
142 max_shift
143 );
144 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145 assert!(
146 slots_log2 + max_shift < 64,
147 "slots_per_level << max_shift would overflow u64"
148 );
149 }
150
151 fn tick_ns(&self) -> u64 {
152 self.tick_duration.as_nanos() as u64
153 }
154}
155
156#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161 config: WheelBuilder,
162 chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173 self.config.validate();
174 let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175 let levels = build_levels::<T>(&self.config);
176 TimerWheel {
177 slab,
178 num_levels: self.config.num_levels,
179 levels,
180 current_ticks: 0,
181 tick_ns: self.config.tick_ns(),
182 epoch: now,
183 active_levels: 0,
184 len: 0,
185 _marker: PhantomData,
186 }
187 }
188}
189
190#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195 config: WheelBuilder,
196 capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207 self.config.validate();
208 let slab = bounded::Slab::with_capacity(self.capacity);
209 let levels = build_levels::<T>(&self.config);
210 TimerWheel {
211 slab,
212 num_levels: self.config.num_levels,
213 levels,
214 current_ticks: 0,
215 tick_ns: self.config.tick_ns(),
216 epoch: now,
217 active_levels: 0,
218 len: 0,
219 _marker: PhantomData,
220 }
221 }
222}
223
224pub struct TimerWheel<
241 T: 'static,
242 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244 slab: S,
245 levels: Vec<Level<T>>,
246 num_levels: usize,
247 active_levels: u8,
248 current_ticks: u64,
249 tick_ns: u64,
250 epoch: Instant,
251 len: usize,
252 _marker: PhantomData<*const ()>, }
254
255#[allow(clippy::non_send_fields_in_send_ty)]
283unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
284
285pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
287
288pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
290
291impl<T: 'static> Wheel<T> {
296 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
300 WheelBuilder::default().unbounded(chunk_capacity).build(now)
301 }
302}
303
304impl<T: 'static> BoundedWheel<T> {
305 pub fn bounded(capacity: usize, now: Instant) -> Self {
309 WheelBuilder::default().bounded(capacity).build(now)
310 }
311}
312
313fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
314 (0..config.num_levels)
315 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
316 .collect()
317}
318
319impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
324 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
334 let deadline_ticks = self.instant_to_ticks(deadline);
335 let entry = WheelEntry::new(deadline_ticks, value, 2);
336 let slot = self.slab.alloc(entry);
337 let ptr = slot.as_ptr();
338 self.insert_entry(ptr, deadline_ticks);
339 self.len += 1;
340 TimerHandle::new(ptr)
341 }
342
343 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
353 let deadline_ticks = self.instant_to_ticks(deadline);
354 let entry = WheelEntry::new(deadline_ticks, value, 1);
355 let slot = self.slab.alloc(entry);
356 let ptr = slot.as_ptr();
357 self.insert_entry(ptr, deadline_ticks);
358 self.len += 1;
359 }
360}
361
362impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>>
367 TimerWheel<T, S>
368{
369 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
375 let deadline_ticks = self.instant_to_ticks(deadline);
376 let entry = WheelEntry::new(deadline_ticks, value, 2);
377 match self.slab.try_alloc(entry) {
378 Ok(slot) => {
379 let ptr = slot.as_ptr();
380 self.insert_entry(ptr, deadline_ticks);
381 self.len += 1;
382 Ok(TimerHandle::new(ptr))
383 }
384 Err(full) => {
385 let wheel_entry = full.into_inner();
388 let value = unsafe { wheel_entry.take_value() }
389 .expect("entry was just constructed with Some(value)");
390 Err(Full(value))
391 }
392 }
393 }
394
395 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
401 let deadline_ticks = self.instant_to_ticks(deadline);
402 let entry = WheelEntry::new(deadline_ticks, value, 1);
403 match self.slab.try_alloc(entry) {
404 Ok(slot) => {
405 let ptr = slot.as_ptr();
406 self.insert_entry(ptr, deadline_ticks);
407 self.len += 1;
408 Ok(())
409 }
410 Err(full) => {
411 let wheel_entry = full.into_inner();
412 let value = unsafe { wheel_entry.take_value() }
413 .expect("entry was just constructed with Some(value)");
414 Err(Full(value))
415 }
416 }
417 }
418}
419
420impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
425 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
434 let ptr = handle.ptr;
435 mem::forget(handle);
437
438 let entry = unsafe { entry_ref(ptr) };
440 let refs = entry.refs();
441
442 if refs == 2 {
443 let value = unsafe { entry.take_value() };
445 self.remove_entry(ptr);
446 self.len -= 1;
447 unsafe { self.slab.free_ptr(ptr) };
449 value
450 } else {
451 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
454 unsafe { self.slab.free_ptr(ptr) };
456 None
457 }
458 }
459
460 pub fn free(&mut self, handle: TimerHandle<T>) {
468 let ptr = handle.ptr;
469 mem::forget(handle);
470
471 let entry = unsafe { entry_ref(ptr) };
473 let new_refs = entry.dec_refs();
474
475 if new_refs == 0 {
476 unsafe { self.slab.free_ptr(ptr) };
479 }
480 }
482
483 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
495 let ptr = handle.ptr;
496 mem::forget(handle);
497
498 let entry = unsafe { entry_ref(ptr) };
500 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
501
502 self.remove_entry(ptr);
504
505 let new_ticks = self.instant_to_ticks(new_deadline);
507 entry.set_deadline_ticks(new_ticks);
508 self.insert_entry(ptr, new_ticks);
509
510 TimerHandle::new(ptr)
511 }
512
513 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
517 self.poll_with_limit(now, usize::MAX, buf)
518 }
519
520 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
527 let now_ticks = self.instant_to_ticks(now);
528 self.current_ticks = now_ticks;
529
530 let mut fired = 0;
531 let mut mask = self.active_levels;
532
533 while mask != 0 && fired < limit {
534 let lvl_idx = mask.trailing_zeros() as usize;
535 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
537 }
538 fired
539 }
540
541 pub fn next_deadline(&self) -> Option<Instant> {
546 let mut min_ticks: Option<u64> = None;
547
548 let mut lvl_mask = self.active_levels;
549 while lvl_mask != 0 {
550 let lvl_idx = lvl_mask.trailing_zeros() as usize;
551 lvl_mask &= lvl_mask - 1;
552
553 let level = &self.levels[lvl_idx];
554 let mut slot_mask = level.active_slots();
555 while slot_mask != 0 {
556 let slot_idx = slot_mask.trailing_zeros() as usize;
557 slot_mask &= slot_mask - 1;
558
559 let slot = level.slot(slot_idx);
560 let mut entry_ptr = slot.entry_head();
561
562 while !entry_ptr.is_null() {
563 let entry = unsafe { entry_ref(entry_ptr) };
565 let dt = entry.deadline_ticks();
566 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
567 entry_ptr = entry.next();
568 }
569 }
570 }
571
572 min_ticks.map(|t| self.ticks_to_instant(t))
573 }
574
575 #[inline]
577 pub fn len(&self) -> usize {
578 self.len
579 }
580
581 #[inline]
583 pub fn is_empty(&self) -> bool {
584 self.len == 0
585 }
586
587 #[inline]
592 fn instant_to_ticks(&self, instant: Instant) -> u64 {
593 let dur = instant.saturating_duration_since(self.epoch);
595 dur.as_nanos() as u64 / self.tick_ns
596 }
597
598 #[inline]
599 fn ticks_to_instant(&self, ticks: u64) -> Instant {
600 self.epoch + Duration::from_nanos(ticks * self.tick_ns)
601 }
602
603 #[inline]
613 fn select_level(&self, deadline_ticks: u64) -> usize {
614 let delta = deadline_ticks.saturating_sub(self.current_ticks);
615
616 for (i, level) in self.levels.iter().enumerate() {
617 if delta < level.range() {
618 return i;
619 }
620 }
621
622 self.num_levels - 1
624 }
625
626 #[inline]
635 #[allow(clippy::needless_pass_by_ref_mut)]
636 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
637 let lvl_idx = self.select_level(deadline_ticks);
638 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
639
640 let entry = unsafe { entry_ref(entry_ptr) };
643 entry.set_location(lvl_idx as u8, slot_idx as u16);
644
645 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
647
648 self.levels[lvl_idx].activate_slot(slot_idx);
650 self.active_levels |= 1 << lvl_idx;
651 }
652
653 #[inline]
659 #[allow(clippy::needless_pass_by_ref_mut)]
660 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
661 let entry = unsafe { entry_ref(entry_ptr) };
663
664 let lvl_idx = entry.level() as usize;
665 let slot_idx = entry.slot_idx() as usize;
666
667 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
669
670 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
671 self.levels[lvl_idx].deactivate_slot(slot_idx);
672 if !self.levels[lvl_idx].is_active() {
673 self.active_levels &= !(1 << lvl_idx);
674 }
675 }
676 }
677
678 #[inline]
686 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
687 let entry = unsafe { entry_ref(entry_ptr) };
689
690 let value = unsafe { entry.take_value() };
693
694 let new_refs = entry.dec_refs();
695 if new_refs == 0 {
696 unsafe { self.slab.free_ptr(entry_ptr) };
699 }
700 self.len -= 1;
704 value
705 }
706
707 fn poll_level(
714 &mut self,
715 lvl_idx: usize,
716 now_ticks: u64,
717 limit: usize,
718 buf: &mut Vec<T>,
719 ) -> usize {
720 let mut fired = 0;
721 let mut mask = self.levels[lvl_idx].active_slots();
722
723 while mask != 0 && fired < limit {
724 let slot_idx = mask.trailing_zeros() as usize;
725 mask &= mask - 1;
726
727 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
728 let slot = unsafe { &*slot_ptr };
732 let mut entry_ptr = slot.entry_head();
733
734 while !entry_ptr.is_null() && fired < limit {
735 let entry = unsafe { entry_ref(entry_ptr) };
737 let next_entry = entry.next();
738
739 if entry.deadline_ticks() <= now_ticks {
740 unsafe { slot.remove_entry(entry_ptr) };
741
742 if let Some(value) = self.fire_entry(entry_ptr) {
743 buf.push(value);
744 }
745 fired += 1;
746 }
747
748 entry_ptr = next_entry;
749 }
750
751 if slot.is_empty() {
752 self.levels[lvl_idx].deactivate_slot(slot_idx);
753 }
754 }
755
756 if !self.levels[lvl_idx].is_active() {
758 self.active_levels &= !(1 << lvl_idx);
759 }
760
761 fired
762 }
763}
764
765impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
770 fn drop(&mut self) {
771 let mut lvl_mask = self.active_levels;
773 while lvl_mask != 0 {
774 let lvl_idx = lvl_mask.trailing_zeros() as usize;
775 lvl_mask &= lvl_mask - 1;
776
777 let level = &self.levels[lvl_idx];
778 let mut slot_mask = level.active_slots();
779 while slot_mask != 0 {
780 let slot_idx = slot_mask.trailing_zeros() as usize;
781 slot_mask &= slot_mask - 1;
782
783 let slot = level.slot(slot_idx);
784 let mut entry_ptr = slot.entry_head();
785 while !entry_ptr.is_null() {
786 let entry = unsafe { entry_ref(entry_ptr) };
788 let next_entry = entry.next();
789
790 unsafe { self.slab.free(nexus_slab::Slot::from_ptr(entry_ptr)) };
792
793 entry_ptr = next_entry;
794 }
795 }
796 }
797 }
798}
799
800#[cfg(test)]
805mod tests {
806 use super::*;
807 use std::time::{Duration, Instant};
808
809 fn ms(millis: u64) -> Duration {
810 Duration::from_millis(millis)
811 }
812
813 fn _assert_send<T: Send>() {}
818
819 #[test]
820 fn wheel_is_send() {
821 _assert_send::<Wheel<u64>>();
822 _assert_send::<BoundedWheel<u64>>();
823 }
824
825 #[test]
830 fn default_config() {
831 let now = Instant::now();
832 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
833 assert!(wheel.is_empty());
834 assert_eq!(wheel.len(), 0);
835 }
836
837 #[test]
838 fn bounded_construction() {
839 let now = Instant::now();
840 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
841 assert!(wheel.is_empty());
842 }
843
844 #[test]
845 #[should_panic(expected = "slots_per_level must be a power of 2")]
846 fn invalid_config_non_power_of_two() {
847 let now = Instant::now();
848 WheelBuilder::default()
849 .slots_per_level(65)
850 .unbounded(1024)
851 .build::<u64>(now);
852 }
853
854 #[test]
859 fn schedule_and_cancel() {
860 let now = Instant::now();
861 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
862
863 let h = wheel.schedule(now + ms(50), 42);
864 assert_eq!(wheel.len(), 1);
865
866 let val = wheel.cancel(h);
867 assert_eq!(val, Some(42));
868 assert_eq!(wheel.len(), 0);
869 }
870
871 #[test]
872 fn schedule_forget_fires() {
873 let now = Instant::now();
874 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
875
876 wheel.schedule_forget(now + ms(10), 99);
877 assert_eq!(wheel.len(), 1);
878
879 let mut buf = Vec::new();
880 let fired = wheel.poll(now + ms(20), &mut buf);
881 assert_eq!(fired, 1);
882 assert_eq!(buf, vec![99]);
883 assert_eq!(wheel.len(), 0);
884 }
885
886 #[test]
887 fn cancel_after_fire_returns_none() {
888 let now = Instant::now();
889 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
890
891 let h = wheel.schedule(now + ms(10), 42);
892
893 let mut buf = Vec::new();
894 wheel.poll(now + ms(20), &mut buf);
895 assert_eq!(buf, vec![42]);
896
897 let val = wheel.cancel(h);
899 assert_eq!(val, None);
900 }
901
902 #[test]
903 fn free_active_timer_becomes_fire_and_forget() {
904 let now = Instant::now();
905 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
906
907 let h = wheel.schedule(now + ms(10), 42);
908 wheel.free(h); assert_eq!(wheel.len(), 1);
910
911 let mut buf = Vec::new();
912 wheel.poll(now + ms(20), &mut buf);
913 assert_eq!(buf, vec![42]);
914 assert_eq!(wheel.len(), 0);
915 }
916
917 #[test]
918 fn free_zombie_handle() {
919 let now = Instant::now();
920 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
921
922 let h = wheel.schedule(now + ms(10), 42);
923
924 let mut buf = Vec::new();
925 wheel.poll(now + ms(20), &mut buf);
926
927 wheel.free(h);
929 }
930
931 #[test]
936 fn bounded_full() {
937 let now = Instant::now();
938 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
939
940 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
941 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
942
943 let err = wheel.try_schedule(now + ms(30), 3);
944 assert!(err.is_err());
945 let recovered = err.unwrap_err().into_inner();
946 assert_eq!(recovered, 3);
947
948 wheel.cancel(h1);
950 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
951
952 wheel.free(h2);
954 wheel.free(h3);
955 }
956
957 #[test]
958 fn bounded_schedule_forget_full() {
959 let now = Instant::now();
960 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
961
962 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
963 let err = wheel.try_schedule_forget(now + ms(20), 2);
964 assert!(err.is_err());
965 }
966
967 #[test]
972 fn poll_respects_deadline() {
973 let now = Instant::now();
974 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
975
976 wheel.schedule_forget(now + ms(10), 1);
977 wheel.schedule_forget(now + ms(50), 2);
978 wheel.schedule_forget(now + ms(100), 3);
979
980 let mut buf = Vec::new();
981
982 let fired = wheel.poll(now + ms(20), &mut buf);
984 assert_eq!(fired, 1);
985 assert_eq!(buf, vec![1]);
986 assert_eq!(wheel.len(), 2);
987
988 buf.clear();
990 let fired = wheel.poll(now + ms(60), &mut buf);
991 assert_eq!(fired, 1);
992 assert_eq!(buf, vec![2]);
993
994 buf.clear();
996 let fired = wheel.poll(now + ms(200), &mut buf);
997 assert_eq!(fired, 1);
998 assert_eq!(buf, vec![3]);
999
1000 assert!(wheel.is_empty());
1001 }
1002
1003 #[test]
1004 fn poll_with_limit() {
1005 let now = Instant::now();
1006 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1007
1008 for i in 0..10 {
1009 wheel.schedule_forget(now + ms(1), i);
1010 }
1011
1012 let mut buf = Vec::new();
1013
1014 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1016 assert_eq!(fired, 3);
1017 assert_eq!(wheel.len(), 7);
1018
1019 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1020 assert_eq!(fired, 3);
1021 assert_eq!(wheel.len(), 4);
1022
1023 let fired = wheel.poll(now + ms(5), &mut buf);
1025 assert_eq!(fired, 4);
1026 assert!(wheel.is_empty());
1027 assert_eq!(buf.len(), 10);
1028 }
1029
1030 #[test]
1035 fn timers_across_levels() {
1036 let now = Instant::now();
1037 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1038
1039 wheel.schedule_forget(now + ms(5), 0);
1041 wheel.schedule_forget(now + ms(200), 1);
1043 wheel.schedule_forget(now + ms(1000), 2);
1045
1046 let mut buf = Vec::new();
1047
1048 wheel.poll(now + ms(10), &mut buf);
1049 assert_eq!(buf, vec![0]);
1050
1051 buf.clear();
1052 wheel.poll(now + ms(250), &mut buf);
1053 assert_eq!(buf, vec![1]);
1054
1055 buf.clear();
1056 wheel.poll(now + ms(1500), &mut buf);
1057 assert_eq!(buf, vec![2]);
1058
1059 assert!(wheel.is_empty());
1060 }
1061
1062 #[test]
1067 fn next_deadline_empty() {
1068 let now = Instant::now();
1069 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1070 assert!(wheel.next_deadline().is_none());
1071 }
1072
1073 #[test]
1074 fn next_deadline_returns_earliest() {
1075 let now = Instant::now();
1076 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1077
1078 wheel.schedule_forget(now + ms(100), 1);
1079 wheel.schedule_forget(now + ms(50), 2);
1080 wheel.schedule_forget(now + ms(200), 3);
1081
1082 let next = wheel.next_deadline().unwrap();
1083 let delta = next.duration_since(now);
1085 assert!(delta >= ms(49) && delta <= ms(51));
1086 }
1087
1088 #[test]
1093 fn deadline_in_the_past_fires_immediately() {
1094 let now = Instant::now();
1095 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1096
1097 wheel.schedule_forget(now, 42);
1099
1100 let mut buf = Vec::new();
1101 let fired = wheel.poll(now + ms(1), &mut buf);
1102 assert_eq!(fired, 1);
1103 assert_eq!(buf, vec![42]);
1104 }
1105
1106 #[test]
1111 fn deadline_beyond_max_range_clamped() {
1112 let now = Instant::now();
1113 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1114
1115 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1117 assert_eq!(wheel.len(), 1);
1118
1119 let mut buf = Vec::new();
1121 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1122 assert_eq!(buf, vec![99]);
1123
1124 let val = wheel.cancel(h);
1134 assert_eq!(val, None);
1135 }
1136
1137 #[test]
1142 fn drop_cleans_up_active_entries() {
1143 let now = Instant::now();
1144 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1145
1146 for i in 0..100 {
1147 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1148 }
1149
1150 assert_eq!(wheel.len(), 100);
1151 drop(wheel);
1153 }
1154
1155 #[test]
1156 fn drop_with_outstanding_handles() {
1157 let now = Instant::now();
1158 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1159
1160 let h1 = wheel.schedule(now + ms(10), 1);
1162 let h2 = wheel.schedule(now + ms(20), 2);
1163
1164 wheel.free(h1);
1166 wheel.free(h2);
1167
1168 drop(wheel);
1170 }
1171
1172 #[test]
1177 fn level_selection_boundaries() {
1178 let now = Instant::now();
1179 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1180
1181 assert_eq!(wheel.select_level(0), 0);
1183 assert_eq!(wheel.select_level(63), 0);
1184
1185 assert_eq!(wheel.select_level(64), 1);
1187 assert_eq!(wheel.select_level(511), 1);
1188
1189 assert_eq!(wheel.select_level(512), 2);
1191 }
1192
1193 #[test]
1198 fn cancel_after_time_advance() {
1199 let now = Instant::now();
1204 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1205
1206 let h = wheel.schedule(now + ms(500), 42);
1207 assert_eq!(wheel.len(), 1);
1208
1209 let mut buf = Vec::new();
1211 let fired = wheel.poll(now + ms(400), &mut buf);
1212 assert_eq!(fired, 0);
1213 assert!(buf.is_empty());
1214
1215 let val = wheel.cancel(h);
1217 assert_eq!(val, Some(42));
1218 assert_eq!(wheel.len(), 0);
1219 }
1220
1221 #[test]
1226 fn multiple_entries_same_slot() {
1227 let now = Instant::now();
1228 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1229
1230 let mut handles = Vec::new();
1232 for i in 0..5 {
1233 handles.push(wheel.schedule(now + ms(10), i));
1234 }
1235 assert_eq!(wheel.len(), 5);
1236
1237 let v2 = wheel.cancel(handles.remove(2));
1239 assert_eq!(v2, Some(2));
1240 let v0 = wheel.cancel(handles.remove(0));
1241 assert_eq!(v0, Some(0));
1242 assert_eq!(wheel.len(), 3);
1243
1244 let mut buf = Vec::new();
1246 let fired = wheel.poll(now + ms(20), &mut buf);
1247 assert_eq!(fired, 3);
1248
1249 for h in handles {
1251 let val = wheel.cancel(h);
1252 assert_eq!(val, None); }
1254 }
1255
1256 #[test]
1261 fn entry_at_level_boundary() {
1262 let now = Instant::now();
1265 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1266
1267 let h = wheel.schedule(now + ms(64), 99);
1268 assert_eq!(wheel.len(), 1);
1269
1270 let mut buf = Vec::new();
1272 let fired = wheel.poll(now + ms(63), &mut buf);
1273 assert_eq!(fired, 0);
1274
1275 let fired = wheel.poll(now + ms(65), &mut buf);
1277 assert_eq!(fired, 1);
1278 assert_eq!(buf, vec![99]);
1279
1280 wheel.cancel(h);
1282 }
1283
1284 #[test]
1289 fn poll_with_limit_mixed_expiry() {
1290 let now = Instant::now();
1291 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1292
1293 wheel.schedule_forget(now + ms(5), 1);
1295 wheel.schedule_forget(now + ms(5), 2);
1296 wheel.schedule_forget(now + ms(5), 3);
1297 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1300
1301 let mut buf = Vec::new();
1302
1303 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1305 assert_eq!(fired, 2);
1306 assert_eq!(wheel.len(), 3);
1307
1308 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1310 assert_eq!(fired, 1);
1311 assert_eq!(wheel.len(), 2);
1312
1313 assert_eq!(buf.len(), 3);
1315 }
1316
1317 #[test]
1322 fn reuse_after_full_drain() {
1323 let now = Instant::now();
1324 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1325
1326 for i in 0..10 {
1328 wheel.schedule_forget(now + ms(1), i);
1329 }
1330 let mut buf = Vec::new();
1331 wheel.poll(now + ms(5), &mut buf);
1332 assert_eq!(buf.len(), 10);
1333 assert!(wheel.is_empty());
1334
1335 buf.clear();
1337 for i in 10..20 {
1338 wheel.schedule_forget(now + ms(100), i);
1339 }
1340 assert_eq!(wheel.len(), 10);
1341
1342 wheel.poll(now + ms(200), &mut buf);
1343 assert_eq!(buf.len(), 10);
1344 assert!(wheel.is_empty());
1345 }
1346
1347 #[test]
1352 fn all_levels_active() {
1353 let now = Instant::now();
1354 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1355
1356 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1359 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1360 for (i, &d) in distances.iter().enumerate() {
1361 handles.push(wheel.schedule(now + ms(d), i as u64));
1362 }
1363 assert_eq!(wheel.len(), 7);
1364
1365 let order = [4, 1, 6, 0, 3, 5, 2];
1367 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1370 handles.into_iter().map(Some).collect();
1371
1372 for &idx in &order {
1373 let h = opt_handles[idx].take().unwrap();
1374 let val = wheel.cancel(h);
1375 assert_eq!(val, Some(idx as u64));
1376 }
1377 assert!(wheel.is_empty());
1378 }
1379
1380 #[test]
1385 fn poll_values_match() {
1386 let now = Instant::now();
1387 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1388
1389 let expected: Vec<u64> = (100..110).collect();
1390 for &v in &expected {
1391 wheel.schedule_forget(now + ms(5), v);
1392 }
1393
1394 let mut buf = Vec::new();
1395 wheel.poll(now + ms(10), &mut buf);
1396
1397 buf.sort();
1398 assert_eq!(buf, expected);
1399 }
1400
1401 #[test]
1406 fn reschedule_moves_deadline() {
1407 let now = Instant::now();
1408 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1409
1410 let h = wheel.schedule(now + ms(100), 42);
1411 assert_eq!(wheel.len(), 1);
1412
1413 let h = wheel.reschedule(h, now + ms(50));
1415 assert_eq!(wheel.len(), 1);
1416
1417 let mut buf = Vec::new();
1419 let fired = wheel.poll(now + ms(40), &mut buf);
1420 assert_eq!(fired, 0);
1421
1422 let fired = wheel.poll(now + ms(55), &mut buf);
1424 assert_eq!(fired, 1);
1425 assert_eq!(buf, vec![42]);
1426
1427 wheel.cancel(h);
1429 }
1430
1431 #[test]
1432 fn reschedule_to_later() {
1433 let now = Instant::now();
1434 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1435
1436 let h = wheel.schedule(now + ms(50), 7);
1437
1438 let h = wheel.reschedule(h, now + ms(200));
1440
1441 let mut buf = Vec::new();
1443 let fired = wheel.poll(now + ms(60), &mut buf);
1444 assert_eq!(fired, 0);
1445
1446 let fired = wheel.poll(now + ms(210), &mut buf);
1448 assert_eq!(fired, 1);
1449 assert_eq!(buf, vec![7]);
1450
1451 wheel.cancel(h);
1452 }
1453
1454 #[test]
1455 #[should_panic(expected = "cannot reschedule a fired timer")]
1456 fn reschedule_panics_on_zombie() {
1457 let now = Instant::now();
1458 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1459
1460 let h = wheel.schedule(now + ms(10), 42);
1461
1462 let mut buf = Vec::new();
1463 wheel.poll(now + ms(20), &mut buf);
1464
1465 let _h = wheel.reschedule(h, now + ms(100));
1467 }
1468}
1469
1470#[cfg(test)]
1471mod proptests {
1472 use super::*;
1473 use proptest::prelude::*;
1474 use std::collections::HashSet;
1475 use std::mem;
1476 use std::time::{Duration, Instant};
1477
1478 #[derive(Debug, Clone)]
1480 enum Op {
1481 Schedule { deadline_ms: u64 },
1483 Cancel { idx: usize },
1485 }
1486
1487 fn op_strategy() -> impl Strategy<Value = Op> {
1488 prop_oneof![
1489 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1491 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1493 ]
1494 }
1495
1496 proptest! {
1497 #![proptest_config(ProptestConfig::with_cases(500))]
1498
1499 #[test]
1506 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1507 let now = Instant::now();
1508 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1509
1510 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1511 let mut active_values: HashSet<u64> = HashSet::new();
1512 let mut next_id: u64 = 0;
1513
1514 for op in &ops {
1515 match op {
1516 Op::Schedule { deadline_ms } => {
1517 let id = next_id;
1518 next_id += 1;
1519 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1520 handles.push(h);
1521 active_values.insert(id);
1522 }
1523 Op::Cancel { idx } => {
1524 if !handles.is_empty() {
1525 let i = idx % handles.len();
1526 let h = handles.swap_remove(i);
1527 let val = wheel.cancel(h);
1528 let v = val.unwrap();
1530 assert!(active_values.remove(&v));
1531 }
1532 }
1533 }
1534 prop_assert_eq!(wheel.len(), active_values.len());
1536 }
1537
1538 let mut buf = Vec::new();
1540 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1542
1543 for h in handles {
1545 mem::forget(h);
1546 }
1547
1548 let fired_set: HashSet<u64> = buf.into_iter().collect();
1549 prop_assert_eq!(fired_set, active_values);
1550 prop_assert!(wheel.is_empty());
1551 }
1552
1553 #[test]
1559 fn fuzz_poll_timing(
1560 deadlines in proptest::collection::vec(1u64..5000, 1..100),
1561 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1562 ) {
1563 let now = Instant::now();
1564 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1565
1566 for (i, &d) in deadlines.iter().enumerate() {
1568 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1569 }
1570
1571 let mut sorted_times: Vec<u64> = poll_times;
1573 sorted_times.sort();
1574 sorted_times.dedup();
1575
1576 let mut all_fired: Vec<u64> = Vec::new();
1577
1578 for &t in &sorted_times {
1579 let mut buf = Vec::new();
1580 wheel.poll(now + Duration::from_millis(t), &mut buf);
1581
1582 for &id in &buf {
1584 let deadline_ms = deadlines[id as usize];
1585 prop_assert!(deadline_ms <= t,
1586 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1587 }
1588
1589 all_fired.extend(buf);
1590 }
1591
1592 let mut final_buf = Vec::new();
1594 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1595 all_fired.extend(final_buf);
1596
1597 all_fired.sort();
1599 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1600 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1601 prop_assert!(wheel.is_empty());
1602 }
1603 }
1604}