1use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48 tick_duration: Duration,
49 slots_per_level: usize,
50 clk_shift: u32,
51 num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55 fn default() -> Self {
56 WheelBuilder {
57 tick_duration: Duration::from_millis(1),
58 slots_per_level: 64,
59 clk_shift: 3,
60 num_levels: 7,
61 }
62 }
63}
64
65impl WheelBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn tick_duration(mut self, d: Duration) -> Self {
73 self.tick_duration = d;
74 self
75 }
76
77 pub fn slots_per_level(mut self, n: usize) -> Self {
79 self.slots_per_level = n;
80 self
81 }
82
83 pub fn clk_shift(mut self, s: u32) -> Self {
85 self.clk_shift = s;
86 self
87 }
88
89 pub fn num_levels(mut self, n: usize) -> Self {
91 self.num_levels = n;
92 self
93 }
94
95 pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100 UnboundedWheelBuilder {
101 config: self,
102 chunk_capacity,
103 }
104 }
105
106 pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110 BoundedWheelBuilder {
111 config: self,
112 capacity,
113 }
114 }
115
116 fn validate(&self) {
117 assert!(
118 self.slots_per_level.is_power_of_two(),
119 "slots_per_level must be a power of 2, got {}",
120 self.slots_per_level
121 );
122 assert!(
123 self.slots_per_level <= 64,
124 "slots_per_level must be <= 64 (u64 bitmask), got {}",
125 self.slots_per_level
126 );
127 assert!(self.num_levels > 0, "num_levels must be > 0");
128 assert!(
129 self.num_levels <= 8,
130 "num_levels must be <= 8 (u8 bitmask), got {}",
131 self.num_levels
132 );
133 assert!(self.clk_shift > 0, "clk_shift must be > 0");
134 assert!(
135 !self.tick_duration.is_zero(),
136 "tick_duration must be non-zero"
137 );
138 let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139 assert!(
140 max_shift < 64,
141 "(num_levels - 1) * clk_shift must be < 64, got {}",
142 max_shift
143 );
144 let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145 assert!(
146 slots_log2 + max_shift < 64,
147 "slots_per_level << max_shift would overflow u64"
148 );
149 }
150
151 fn tick_ns(&self) -> u64 {
152 self.tick_duration.as_nanos() as u64
153 }
154}
155
156#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161 config: WheelBuilder,
162 chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166 pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173 self.config.validate();
174 let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175 let levels = build_levels::<T>(&self.config);
176 TimerWheel {
177 slab,
178 num_levels: self.config.num_levels,
179 levels,
180 current_ticks: 0,
181 tick_ns: self.config.tick_ns(),
182 epoch: now,
183 active_levels: 0,
184 len: 0,
185 _marker: PhantomData,
186 }
187 }
188}
189
190#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195 config: WheelBuilder,
196 capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200 pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207 self.config.validate();
208 let slab = bounded::Slab::with_capacity(self.capacity);
209 let levels = build_levels::<T>(&self.config);
210 TimerWheel {
211 slab,
212 num_levels: self.config.num_levels,
213 levels,
214 current_ticks: 0,
215 tick_ns: self.config.tick_ns(),
216 epoch: now,
217 active_levels: 0,
218 len: 0,
219 _marker: PhantomData,
220 }
221 }
222}
223
224pub struct TimerWheel<
241 T: 'static,
242 S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244 slab: S,
245 levels: Vec<Level<T>>,
246 num_levels: usize,
247 active_levels: u8,
248 current_ticks: u64,
249 tick_ns: u64,
250 epoch: Instant,
251 len: usize,
252 _marker: PhantomData<*const ()>, }
254
255#[allow(clippy::non_send_fields_in_send_ty)]
283unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
284
285pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
287
288pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
290
291impl<T: 'static> Wheel<T> {
296 pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
300 WheelBuilder::default().unbounded(chunk_capacity).build(now)
301 }
302}
303
304impl<T: 'static> BoundedWheel<T> {
305 pub fn bounded(capacity: usize, now: Instant) -> Self {
309 WheelBuilder::default().bounded(capacity).build(now)
310 }
311}
312
313fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
314 (0..config.num_levels)
315 .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
316 .collect()
317}
318
319impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
324 pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
334 let deadline_ticks = self.instant_to_ticks(deadline);
335 let entry = WheelEntry::new(deadline_ticks, value, 2);
336 let slot = self.slab.alloc(entry);
337 let ptr = slot.into_ptr();
338 self.insert_entry(ptr, deadline_ticks);
339 self.len += 1;
340 TimerHandle::new(ptr)
341 }
342
343 pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
353 let deadline_ticks = self.instant_to_ticks(deadline);
354 let entry = WheelEntry::new(deadline_ticks, value, 1);
355 let slot = self.slab.alloc(entry);
356 let ptr = slot.into_ptr();
357 self.insert_entry(ptr, deadline_ticks);
358 self.len += 1;
359 }
360}
361
362impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
367 pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
373 let deadline_ticks = self.instant_to_ticks(deadline);
374 let entry = WheelEntry::new(deadline_ticks, value, 2);
375 match self.slab.try_alloc(entry) {
376 Ok(slot) => {
377 let ptr = slot.into_ptr();
378 self.insert_entry(ptr, deadline_ticks);
379 self.len += 1;
380 Ok(TimerHandle::new(ptr))
381 }
382 Err(full) => {
383 let wheel_entry = full.into_inner();
386 let value = unsafe { wheel_entry.take_value() }
387 .expect("entry was just constructed with Some(value)");
388 Err(Full(value))
389 }
390 }
391 }
392
393 pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
399 let deadline_ticks = self.instant_to_ticks(deadline);
400 let entry = WheelEntry::new(deadline_ticks, value, 1);
401 match self.slab.try_alloc(entry) {
402 Ok(slot) => {
403 let ptr = slot.into_ptr();
404 self.insert_entry(ptr, deadline_ticks);
405 self.len += 1;
406 Ok(())
407 }
408 Err(full) => {
409 let wheel_entry = full.into_inner();
410 let value = unsafe { wheel_entry.take_value() }
411 .expect("entry was just constructed with Some(value)");
412 Err(Full(value))
413 }
414 }
415 }
416}
417
418impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
423 pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
432 let ptr = handle.ptr;
433 mem::forget(handle);
435
436 let entry = unsafe { entry_ref(ptr) };
438 let refs = entry.refs();
439
440 if refs == 2 {
441 let value = unsafe { entry.take_value() };
443 self.remove_entry(ptr);
444 self.len -= 1;
445 unsafe { self.slab.free_ptr(ptr) };
447 value
448 } else {
449 debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
452 unsafe { self.slab.free_ptr(ptr) };
454 None
455 }
456 }
457
458 pub fn free(&mut self, handle: TimerHandle<T>) {
466 let ptr = handle.ptr;
467 mem::forget(handle);
468
469 let entry = unsafe { entry_ref(ptr) };
471 let new_refs = entry.dec_refs();
472
473 if new_refs == 0 {
474 unsafe { self.slab.free_ptr(ptr) };
477 }
478 }
480
481 pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
493 let ptr = handle.ptr;
494 mem::forget(handle);
495
496 let entry = unsafe { entry_ref(ptr) };
498 assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
499
500 self.remove_entry(ptr);
502
503 let new_ticks = self.instant_to_ticks(new_deadline);
505 entry.set_deadline_ticks(new_ticks);
506 self.insert_entry(ptr, new_ticks);
507
508 TimerHandle::new(ptr)
509 }
510
511 pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
515 self.poll_with_limit(now, usize::MAX, buf)
516 }
517
518 pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
525 let now_ticks = self.instant_to_ticks(now);
526 self.current_ticks = now_ticks;
527
528 let mut fired = 0;
529 let mut mask = self.active_levels;
530
531 while mask != 0 && fired < limit {
532 let lvl_idx = mask.trailing_zeros() as usize;
533 mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
535 }
536 fired
537 }
538
539 pub fn next_deadline(&self) -> Option<Instant> {
544 let mut min_ticks: Option<u64> = None;
545
546 let mut lvl_mask = self.active_levels;
547 while lvl_mask != 0 {
548 let lvl_idx = lvl_mask.trailing_zeros() as usize;
549 lvl_mask &= lvl_mask - 1;
550
551 let level = &self.levels[lvl_idx];
552 let mut slot_mask = level.active_slots();
553 while slot_mask != 0 {
554 let slot_idx = slot_mask.trailing_zeros() as usize;
555 slot_mask &= slot_mask - 1;
556
557 let slot = level.slot(slot_idx);
558 let mut entry_ptr = slot.entry_head();
559
560 while !entry_ptr.is_null() {
561 let entry = unsafe { entry_ref(entry_ptr) };
563 let dt = entry.deadline_ticks();
564 min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
565 entry_ptr = entry.next();
566 }
567 }
568 }
569
570 min_ticks.map(|t| self.ticks_to_instant(t))
571 }
572
573 #[inline]
575 pub fn len(&self) -> usize {
576 self.len
577 }
578
579 #[inline]
581 pub fn is_empty(&self) -> bool {
582 self.len == 0
583 }
584
585 #[inline]
590 fn instant_to_ticks(&self, instant: Instant) -> u64 {
591 let dur = instant.saturating_duration_since(self.epoch);
593 dur.as_nanos() as u64 / self.tick_ns
594 }
595
596 #[inline]
597 fn ticks_to_instant(&self, ticks: u64) -> Instant {
598 self.epoch + Duration::from_nanos(ticks * self.tick_ns)
599 }
600
601 #[inline]
611 fn select_level(&self, deadline_ticks: u64) -> usize {
612 let delta = deadline_ticks.saturating_sub(self.current_ticks);
613
614 for (i, level) in self.levels.iter().enumerate() {
615 if delta < level.range() {
616 return i;
617 }
618 }
619
620 self.num_levels - 1
622 }
623
624 #[inline]
633 #[allow(clippy::needless_pass_by_ref_mut)]
634 fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
635 let lvl_idx = self.select_level(deadline_ticks);
636 let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
637
638 let entry = unsafe { entry_ref(entry_ptr) };
641 entry.set_location(lvl_idx as u8, slot_idx as u16);
642
643 unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
645
646 self.levels[lvl_idx].activate_slot(slot_idx);
648 self.active_levels |= 1 << lvl_idx;
649 }
650
651 #[inline]
657 #[allow(clippy::needless_pass_by_ref_mut)]
658 fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
659 let entry = unsafe { entry_ref(entry_ptr) };
661
662 let lvl_idx = entry.level() as usize;
663 let slot_idx = entry.slot_idx() as usize;
664
665 unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
667
668 if self.levels[lvl_idx].slot(slot_idx).is_empty() {
669 self.levels[lvl_idx].deactivate_slot(slot_idx);
670 if !self.levels[lvl_idx].is_active() {
671 self.active_levels &= !(1 << lvl_idx);
672 }
673 }
674 }
675
676 #[inline]
684 fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
685 let entry = unsafe { entry_ref(entry_ptr) };
687
688 let value = unsafe { entry.take_value() };
691
692 let new_refs = entry.dec_refs();
693 if new_refs == 0 {
694 unsafe { self.slab.free_ptr(entry_ptr) };
697 }
698 self.len -= 1;
702 value
703 }
704
705 fn poll_level(
712 &mut self,
713 lvl_idx: usize,
714 now_ticks: u64,
715 limit: usize,
716 buf: &mut Vec<T>,
717 ) -> usize {
718 let mut fired = 0;
719 let mut mask = self.levels[lvl_idx].active_slots();
720
721 while mask != 0 && fired < limit {
722 let slot_idx = mask.trailing_zeros() as usize;
723 mask &= mask - 1;
724
725 let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
726 let slot = unsafe { &*slot_ptr };
730 let mut entry_ptr = slot.entry_head();
731
732 while !entry_ptr.is_null() && fired < limit {
733 let entry = unsafe { entry_ref(entry_ptr) };
735 let next_entry = entry.next();
736
737 if entry.deadline_ticks() <= now_ticks {
738 unsafe { slot.remove_entry(entry_ptr) };
739
740 if let Some(value) = self.fire_entry(entry_ptr) {
741 buf.push(value);
742 }
743 fired += 1;
744 }
745
746 entry_ptr = next_entry;
747 }
748
749 if slot.is_empty() {
750 self.levels[lvl_idx].deactivate_slot(slot_idx);
751 }
752 }
753
754 if !self.levels[lvl_idx].is_active() {
756 self.active_levels &= !(1 << lvl_idx);
757 }
758
759 fired
760 }
761}
762
763impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
768 fn drop(&mut self) {
769 let mut lvl_mask = self.active_levels;
771 while lvl_mask != 0 {
772 let lvl_idx = lvl_mask.trailing_zeros() as usize;
773 lvl_mask &= lvl_mask - 1;
774
775 let level = &self.levels[lvl_idx];
776 let mut slot_mask = level.active_slots();
777 while slot_mask != 0 {
778 let slot_idx = slot_mask.trailing_zeros() as usize;
779 slot_mask &= slot_mask - 1;
780
781 let slot = level.slot(slot_idx);
782 let mut entry_ptr = slot.entry_head();
783 while !entry_ptr.is_null() {
784 let entry = unsafe { entry_ref(entry_ptr) };
786 let next_entry = entry.next();
787
788 unsafe { self.slab.free(nexus_slab::RawSlot::from_ptr(entry_ptr)) };
790
791 entry_ptr = next_entry;
792 }
793 }
794 }
795 }
796}
797
798#[cfg(test)]
803mod tests {
804 use super::*;
805 use std::time::{Duration, Instant};
806
807 fn ms(millis: u64) -> Duration {
808 Duration::from_millis(millis)
809 }
810
811 fn _assert_send<T: Send>() {}
816
817 #[test]
818 fn wheel_is_send() {
819 _assert_send::<Wheel<u64>>();
820 _assert_send::<BoundedWheel<u64>>();
821 }
822
823 #[test]
828 fn default_config() {
829 let now = Instant::now();
830 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
831 assert!(wheel.is_empty());
832 assert_eq!(wheel.len(), 0);
833 }
834
835 #[test]
836 fn bounded_construction() {
837 let now = Instant::now();
838 let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
839 assert!(wheel.is_empty());
840 }
841
842 #[test]
843 #[should_panic(expected = "slots_per_level must be a power of 2")]
844 fn invalid_config_non_power_of_two() {
845 let now = Instant::now();
846 WheelBuilder::default()
847 .slots_per_level(65)
848 .unbounded(1024)
849 .build::<u64>(now);
850 }
851
852 #[test]
857 fn schedule_and_cancel() {
858 let now = Instant::now();
859 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
860
861 let h = wheel.schedule(now + ms(50), 42);
862 assert_eq!(wheel.len(), 1);
863
864 let val = wheel.cancel(h);
865 assert_eq!(val, Some(42));
866 assert_eq!(wheel.len(), 0);
867 }
868
869 #[test]
870 fn schedule_forget_fires() {
871 let now = Instant::now();
872 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
873
874 wheel.schedule_forget(now + ms(10), 99);
875 assert_eq!(wheel.len(), 1);
876
877 let mut buf = Vec::new();
878 let fired = wheel.poll(now + ms(20), &mut buf);
879 assert_eq!(fired, 1);
880 assert_eq!(buf, vec![99]);
881 assert_eq!(wheel.len(), 0);
882 }
883
884 #[test]
885 fn cancel_after_fire_returns_none() {
886 let now = Instant::now();
887 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
888
889 let h = wheel.schedule(now + ms(10), 42);
890
891 let mut buf = Vec::new();
892 wheel.poll(now + ms(20), &mut buf);
893 assert_eq!(buf, vec![42]);
894
895 let val = wheel.cancel(h);
897 assert_eq!(val, None);
898 }
899
900 #[test]
901 fn free_active_timer_becomes_fire_and_forget() {
902 let now = Instant::now();
903 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
904
905 let h = wheel.schedule(now + ms(10), 42);
906 wheel.free(h); assert_eq!(wheel.len(), 1);
908
909 let mut buf = Vec::new();
910 wheel.poll(now + ms(20), &mut buf);
911 assert_eq!(buf, vec![42]);
912 assert_eq!(wheel.len(), 0);
913 }
914
915 #[test]
916 fn free_zombie_handle() {
917 let now = Instant::now();
918 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
919
920 let h = wheel.schedule(now + ms(10), 42);
921
922 let mut buf = Vec::new();
923 wheel.poll(now + ms(20), &mut buf);
924
925 wheel.free(h);
927 }
928
929 #[test]
934 fn bounded_full() {
935 let now = Instant::now();
936 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
937
938 let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
939 let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
940
941 let err = wheel.try_schedule(now + ms(30), 3);
942 assert!(err.is_err());
943 let recovered = err.unwrap_err().into_inner();
944 assert_eq!(recovered, 3);
945
946 wheel.cancel(h1);
948 let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
949
950 wheel.free(h2);
952 wheel.free(h3);
953 }
954
955 #[test]
956 fn bounded_schedule_forget_full() {
957 let now = Instant::now();
958 let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
959
960 wheel.try_schedule_forget(now + ms(10), 1).unwrap();
961 let err = wheel.try_schedule_forget(now + ms(20), 2);
962 assert!(err.is_err());
963 }
964
965 #[test]
970 fn poll_respects_deadline() {
971 let now = Instant::now();
972 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
973
974 wheel.schedule_forget(now + ms(10), 1);
975 wheel.schedule_forget(now + ms(50), 2);
976 wheel.schedule_forget(now + ms(100), 3);
977
978 let mut buf = Vec::new();
979
980 let fired = wheel.poll(now + ms(20), &mut buf);
982 assert_eq!(fired, 1);
983 assert_eq!(buf, vec![1]);
984 assert_eq!(wheel.len(), 2);
985
986 buf.clear();
988 let fired = wheel.poll(now + ms(60), &mut buf);
989 assert_eq!(fired, 1);
990 assert_eq!(buf, vec![2]);
991
992 buf.clear();
994 let fired = wheel.poll(now + ms(200), &mut buf);
995 assert_eq!(fired, 1);
996 assert_eq!(buf, vec![3]);
997
998 assert!(wheel.is_empty());
999 }
1000
1001 #[test]
1002 fn poll_with_limit() {
1003 let now = Instant::now();
1004 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1005
1006 for i in 0..10 {
1007 wheel.schedule_forget(now + ms(1), i);
1008 }
1009
1010 let mut buf = Vec::new();
1011
1012 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1014 assert_eq!(fired, 3);
1015 assert_eq!(wheel.len(), 7);
1016
1017 let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1018 assert_eq!(fired, 3);
1019 assert_eq!(wheel.len(), 4);
1020
1021 let fired = wheel.poll(now + ms(5), &mut buf);
1023 assert_eq!(fired, 4);
1024 assert!(wheel.is_empty());
1025 assert_eq!(buf.len(), 10);
1026 }
1027
1028 #[test]
1033 fn timers_across_levels() {
1034 let now = Instant::now();
1035 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1036
1037 wheel.schedule_forget(now + ms(5), 0);
1039 wheel.schedule_forget(now + ms(200), 1);
1041 wheel.schedule_forget(now + ms(1000), 2);
1043
1044 let mut buf = Vec::new();
1045
1046 wheel.poll(now + ms(10), &mut buf);
1047 assert_eq!(buf, vec![0]);
1048
1049 buf.clear();
1050 wheel.poll(now + ms(250), &mut buf);
1051 assert_eq!(buf, vec![1]);
1052
1053 buf.clear();
1054 wheel.poll(now + ms(1500), &mut buf);
1055 assert_eq!(buf, vec![2]);
1056
1057 assert!(wheel.is_empty());
1058 }
1059
1060 #[test]
1065 fn next_deadline_empty() {
1066 let now = Instant::now();
1067 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1068 assert!(wheel.next_deadline().is_none());
1069 }
1070
1071 #[test]
1072 fn next_deadline_returns_earliest() {
1073 let now = Instant::now();
1074 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1075
1076 wheel.schedule_forget(now + ms(100), 1);
1077 wheel.schedule_forget(now + ms(50), 2);
1078 wheel.schedule_forget(now + ms(200), 3);
1079
1080 let next = wheel.next_deadline().unwrap();
1081 let delta = next.duration_since(now);
1083 assert!(delta >= ms(49) && delta <= ms(51));
1084 }
1085
1086 #[test]
1091 fn deadline_in_the_past_fires_immediately() {
1092 let now = Instant::now();
1093 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1094
1095 wheel.schedule_forget(now, 42);
1097
1098 let mut buf = Vec::new();
1099 let fired = wheel.poll(now + ms(1), &mut buf);
1100 assert_eq!(fired, 1);
1101 assert_eq!(buf, vec![42]);
1102 }
1103
1104 #[test]
1109 fn deadline_beyond_max_range_clamped() {
1110 let now = Instant::now();
1111 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1112
1113 let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1115 assert_eq!(wheel.len(), 1);
1116
1117 let mut buf = Vec::new();
1119 wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1120 assert_eq!(buf, vec![99]);
1121
1122 let val = wheel.cancel(h);
1132 assert_eq!(val, None);
1133 }
1134
1135 #[test]
1140 fn drop_cleans_up_active_entries() {
1141 let now = Instant::now();
1142 let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1143
1144 for i in 0..100 {
1145 wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1146 }
1147
1148 assert_eq!(wheel.len(), 100);
1149 drop(wheel);
1151 }
1152
1153 #[test]
1154 fn drop_with_outstanding_handles() {
1155 let now = Instant::now();
1156 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1157
1158 let h1 = wheel.schedule(now + ms(10), 1);
1160 let h2 = wheel.schedule(now + ms(20), 2);
1161
1162 wheel.free(h1);
1164 wheel.free(h2);
1165
1166 drop(wheel);
1168 }
1169
1170 #[test]
1175 fn level_selection_boundaries() {
1176 let now = Instant::now();
1177 let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1178
1179 assert_eq!(wheel.select_level(0), 0);
1181 assert_eq!(wheel.select_level(63), 0);
1182
1183 assert_eq!(wheel.select_level(64), 1);
1185 assert_eq!(wheel.select_level(511), 1);
1186
1187 assert_eq!(wheel.select_level(512), 2);
1189 }
1190
1191 #[test]
1196 fn cancel_after_time_advance() {
1197 let now = Instant::now();
1202 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1203
1204 let h = wheel.schedule(now + ms(500), 42);
1205 assert_eq!(wheel.len(), 1);
1206
1207 let mut buf = Vec::new();
1209 let fired = wheel.poll(now + ms(400), &mut buf);
1210 assert_eq!(fired, 0);
1211 assert!(buf.is_empty());
1212
1213 let val = wheel.cancel(h);
1215 assert_eq!(val, Some(42));
1216 assert_eq!(wheel.len(), 0);
1217 }
1218
1219 #[test]
1224 fn multiple_entries_same_slot() {
1225 let now = Instant::now();
1226 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1227
1228 let mut handles = Vec::new();
1230 for i in 0..5 {
1231 handles.push(wheel.schedule(now + ms(10), i));
1232 }
1233 assert_eq!(wheel.len(), 5);
1234
1235 let v2 = wheel.cancel(handles.remove(2));
1237 assert_eq!(v2, Some(2));
1238 let v0 = wheel.cancel(handles.remove(0));
1239 assert_eq!(v0, Some(0));
1240 assert_eq!(wheel.len(), 3);
1241
1242 let mut buf = Vec::new();
1244 let fired = wheel.poll(now + ms(20), &mut buf);
1245 assert_eq!(fired, 3);
1246
1247 for h in handles {
1249 let val = wheel.cancel(h);
1250 assert_eq!(val, None); }
1252 }
1253
1254 #[test]
1259 fn entry_at_level_boundary() {
1260 let now = Instant::now();
1263 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1264
1265 let h = wheel.schedule(now + ms(64), 99);
1266 assert_eq!(wheel.len(), 1);
1267
1268 let mut buf = Vec::new();
1270 let fired = wheel.poll(now + ms(63), &mut buf);
1271 assert_eq!(fired, 0);
1272
1273 let fired = wheel.poll(now + ms(65), &mut buf);
1275 assert_eq!(fired, 1);
1276 assert_eq!(buf, vec![99]);
1277
1278 wheel.cancel(h);
1280 }
1281
1282 #[test]
1287 fn poll_with_limit_mixed_expiry() {
1288 let now = Instant::now();
1289 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1290
1291 wheel.schedule_forget(now + ms(5), 1);
1293 wheel.schedule_forget(now + ms(5), 2);
1294 wheel.schedule_forget(now + ms(5), 3);
1295 wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
1298
1299 let mut buf = Vec::new();
1300
1301 let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1303 assert_eq!(fired, 2);
1304 assert_eq!(wheel.len(), 3);
1305
1306 let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1308 assert_eq!(fired, 1);
1309 assert_eq!(wheel.len(), 2);
1310
1311 assert_eq!(buf.len(), 3);
1313 }
1314
1315 #[test]
1320 fn reuse_after_full_drain() {
1321 let now = Instant::now();
1322 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1323
1324 for i in 0..10 {
1326 wheel.schedule_forget(now + ms(1), i);
1327 }
1328 let mut buf = Vec::new();
1329 wheel.poll(now + ms(5), &mut buf);
1330 assert_eq!(buf.len(), 10);
1331 assert!(wheel.is_empty());
1332
1333 buf.clear();
1335 for i in 10..20 {
1336 wheel.schedule_forget(now + ms(100), i);
1337 }
1338 assert_eq!(wheel.len(), 10);
1339
1340 wheel.poll(now + ms(200), &mut buf);
1341 assert_eq!(buf.len(), 10);
1342 assert!(wheel.is_empty());
1343 }
1344
1345 #[test]
1350 fn all_levels_active() {
1351 let now = Instant::now();
1352 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1353
1354 let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1357 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1358 for (i, &d) in distances.iter().enumerate() {
1359 handles.push(wheel.schedule(now + ms(d), i as u64));
1360 }
1361 assert_eq!(wheel.len(), 7);
1362
1363 let order = [4, 1, 6, 0, 3, 5, 2];
1365 let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1368 handles.into_iter().map(Some).collect();
1369
1370 for &idx in &order {
1371 let h = opt_handles[idx].take().unwrap();
1372 let val = wheel.cancel(h);
1373 assert_eq!(val, Some(idx as u64));
1374 }
1375 assert!(wheel.is_empty());
1376 }
1377
1378 #[test]
1383 fn poll_values_match() {
1384 let now = Instant::now();
1385 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1386
1387 let expected: Vec<u64> = (100..110).collect();
1388 for &v in &expected {
1389 wheel.schedule_forget(now + ms(5), v);
1390 }
1391
1392 let mut buf = Vec::new();
1393 wheel.poll(now + ms(10), &mut buf);
1394
1395 buf.sort();
1396 assert_eq!(buf, expected);
1397 }
1398
1399 #[test]
1404 fn reschedule_moves_deadline() {
1405 let now = Instant::now();
1406 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1407
1408 let h = wheel.schedule(now + ms(100), 42);
1409 assert_eq!(wheel.len(), 1);
1410
1411 let h = wheel.reschedule(h, now + ms(50));
1413 assert_eq!(wheel.len(), 1);
1414
1415 let mut buf = Vec::new();
1417 let fired = wheel.poll(now + ms(40), &mut buf);
1418 assert_eq!(fired, 0);
1419
1420 let fired = wheel.poll(now + ms(55), &mut buf);
1422 assert_eq!(fired, 1);
1423 assert_eq!(buf, vec![42]);
1424
1425 wheel.cancel(h);
1427 }
1428
1429 #[test]
1430 fn reschedule_to_later() {
1431 let now = Instant::now();
1432 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1433
1434 let h = wheel.schedule(now + ms(50), 7);
1435
1436 let h = wheel.reschedule(h, now + ms(200));
1438
1439 let mut buf = Vec::new();
1441 let fired = wheel.poll(now + ms(60), &mut buf);
1442 assert_eq!(fired, 0);
1443
1444 let fired = wheel.poll(now + ms(210), &mut buf);
1446 assert_eq!(fired, 1);
1447 assert_eq!(buf, vec![7]);
1448
1449 wheel.cancel(h);
1450 }
1451
1452 #[test]
1453 #[should_panic(expected = "cannot reschedule a fired timer")]
1454 fn reschedule_panics_on_zombie() {
1455 let now = Instant::now();
1456 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1457
1458 let h = wheel.schedule(now + ms(10), 42);
1459
1460 let mut buf = Vec::new();
1461 wheel.poll(now + ms(20), &mut buf);
1462
1463 let _h = wheel.reschedule(h, now + ms(100));
1465 }
1466}
1467
1468#[cfg(test)]
1469mod proptests {
1470 use super::*;
1471 use proptest::prelude::*;
1472 use std::collections::HashSet;
1473 use std::mem;
1474 use std::time::{Duration, Instant};
1475
1476 #[derive(Debug, Clone)]
1478 enum Op {
1479 Schedule { deadline_ms: u64 },
1481 Cancel { idx: usize },
1483 }
1484
1485 fn op_strategy() -> impl Strategy<Value = Op> {
1486 prop_oneof![
1487 (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1489 any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1491 ]
1492 }
1493
1494 proptest! {
1495 #![proptest_config(ProptestConfig::with_cases(500))]
1496
1497 #[test]
1504 fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1505 let now = Instant::now();
1506 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1507
1508 let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1509 let mut active_values: HashSet<u64> = HashSet::new();
1510 let mut next_id: u64 = 0;
1511
1512 for op in &ops {
1513 match op {
1514 Op::Schedule { deadline_ms } => {
1515 let id = next_id;
1516 next_id += 1;
1517 let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1518 handles.push(h);
1519 active_values.insert(id);
1520 }
1521 Op::Cancel { idx } => {
1522 if !handles.is_empty() {
1523 let i = idx % handles.len();
1524 let h = handles.swap_remove(i);
1525 let val = wheel.cancel(h);
1526 let v = val.unwrap();
1528 assert!(active_values.remove(&v));
1529 }
1530 }
1531 }
1532 prop_assert_eq!(wheel.len(), active_values.len());
1534 }
1535
1536 let mut buf = Vec::new();
1538 wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1540
1541 for h in handles {
1543 mem::forget(h);
1544 }
1545
1546 let fired_set: HashSet<u64> = buf.into_iter().collect();
1547 prop_assert_eq!(fired_set, active_values);
1548 prop_assert!(wheel.is_empty());
1549 }
1550
1551 #[test]
1557 fn fuzz_poll_timing(
1558 deadlines in proptest::collection::vec(1u64..5000, 1..100),
1559 poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1560 ) {
1561 let now = Instant::now();
1562 let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1563
1564 for (i, &d) in deadlines.iter().enumerate() {
1566 wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1567 }
1568
1569 let mut sorted_times: Vec<u64> = poll_times;
1571 sorted_times.sort();
1572 sorted_times.dedup();
1573
1574 let mut all_fired: Vec<u64> = Vec::new();
1575
1576 for &t in &sorted_times {
1577 let mut buf = Vec::new();
1578 wheel.poll(now + Duration::from_millis(t), &mut buf);
1579
1580 for &id in &buf {
1582 let deadline_ms = deadlines[id as usize];
1583 prop_assert!(deadline_ms <= t,
1584 "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1585 }
1586
1587 all_fired.extend(buf);
1588 }
1589
1590 let mut final_buf = Vec::new();
1592 wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1593 all_fired.extend(final_buf);
1594
1595 all_fired.sort();
1597 let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1598 prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1599 prop_assert!(wheel.is_empty());
1600 }
1601 }
1602}