1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{TaskCompletion, TaskId, TaskLocation, TaskTypeWithCompletionNotifier, TimerTaskForWheel, TimerTaskWithCompletionNotifier};
4use rustc_hash::FxHashMap;
5use std::time::Duration;
6
7pub struct WheelAdvanceResult {
8 pub id: TaskId,
9 pub callback: Option<CallbackWrapper>
10}
11
12struct WheelLayer {
16 slots: Vec<Vec<TimerTaskForWheel>>,
20
21 current_tick: u64,
25
26 slot_count: usize,
30
31 tick_duration: Duration,
35
36 tick_duration_ms: u64,
40
41 slot_mask: usize,
45}
46
47impl WheelLayer {
48 fn new(slot_count: usize, tick_duration: Duration) -> Self {
52 let mut slots = Vec::with_capacity(slot_count);
53 for _ in 0..slot_count {
59 slots.push(Vec::with_capacity(4));
60 }
61
62 let tick_duration_ms = tick_duration.as_millis() as u64;
63 let slot_mask = slot_count - 1;
64
65 Self {
66 slots,
67 current_tick: 0,
68 slot_count,
69 tick_duration,
70 tick_duration_ms,
71 slot_mask,
72 }
73 }
74
75 fn delay_to_ticks(&self, delay: Duration) -> u64 {
79 let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
80 ticks.max(1) }
82}
83
84pub struct Wheel {
88 l0: WheelLayer,
92
93 l1: WheelLayer,
97
98 l1_tick_ratio: u64,
102
103 task_index: FxHashMap<TaskId, TaskLocation>,
107
108 batch_config: BatchConfig,
112
113 l0_capacity_ms: u64,
117
118 l1_capacity_ticks: u64,
122}
123
124impl Wheel {
125 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
143 let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
144 let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
145
146 let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
149
150 let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
153 let l1_capacity_ticks = l1.slot_count as u64;
154
155 Self {
156 l0,
157 l1,
158 l1_tick_ratio,
159 task_index: FxHashMap::default(),
160 batch_config,
161 l0_capacity_ms,
162 l1_capacity_ticks,
163 }
164 }
165
166 #[allow(dead_code)]
170 pub fn current_tick(&self) -> u64 {
171 self.l0.current_tick
172 }
173
174 #[allow(dead_code)]
178 pub fn tick_duration(&self) -> Duration {
179 self.l0.tick_duration
180 }
181
182 #[allow(dead_code)]
186 pub fn slot_count(&self) -> usize {
187 self.l0.slot_count
188 }
189
190 #[allow(dead_code)]
194 pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
195 self.l0.delay_to_ticks(delay)
196 }
197
198 #[inline(always)]
214 fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
215 let delay_ms = delay.as_millis() as u64;
216
217 if delay_ms < self.l0_capacity_ms {
220 let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
221 return (0, l0_ticks, 0);
222 }
223
224 let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
227
228 if l1_ticks < self.l1_capacity_ticks {
229 (1, l1_ticks, 0)
230 } else {
231 let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
232 (1, l1_ticks, rounds)
233 }
234 }
235
236 #[inline]
266 pub fn insert(&mut self, task: TimerTaskWithCompletionNotifier) -> TaskId {
267 let (level, ticks, rounds) = self.determine_layer(task.delay);
268
269 let (current_tick, slot_mask, slots) = match level {
272 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
273 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
274 };
275
276 let total_ticks = current_tick + ticks;
277 let slot_index = (total_ticks as usize) & slot_mask;
278
279 let task = TimerTaskForWheel::new(task, total_ticks, rounds);
280
281 let task_id = task.get_id();
282
283 let vec_index = slots[slot_index].len();
286 let location = TaskLocation::new(level, slot_index, vec_index);
287
288 slots[slot_index].push(task);
291
292 self.task_index.insert(task_id, location);
295
296 task_id
297 }
298
299 #[inline]
323 pub fn insert_batch(&mut self, tasks: Vec<TimerTaskWithCompletionNotifier>) -> Vec<TaskId> {
324 let task_count = tasks.len();
325
326 self.task_index.reserve(task_count);
329
330 let mut task_ids = Vec::with_capacity(task_count);
331
332 for task in tasks {
333 let (level, ticks, rounds) = self.determine_layer(task.delay);
334
335 let (current_tick, slot_mask, slots) = match level {
338 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
339 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
340 };
341
342 let total_ticks = current_tick + ticks;
343 let slot_index = (total_ticks as usize) & slot_mask;
344
345 let task = TimerTaskForWheel::new(task, total_ticks, rounds);
346
347 let task_id = task.get_id();
348
349 let vec_index = slots[slot_index].len();
352 let location = TaskLocation::new(level, slot_index, vec_index);
353
354 slots[slot_index].push(task);
357
358 self.task_index.insert(task_id, location);
361
362 task_ids.push(task_id);
363 }
364
365 task_ids
366 }
367
368 #[inline]
384 pub fn cancel(&mut self, task_id: TaskId) -> bool {
385 let location = match self.task_index.remove(&task_id) {
388 Some(loc) => loc,
389 None => return false,
390 };
391
392 let slot = match location.level {
395 0 => &mut self.l0.slots[location.slot_index],
396 _ => &mut self.l1.slots[location.slot_index],
397 };
398
399 if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
402 self.task_index.insert(task_id, location);
405 return false;
406 }
407 let removed_task = slot.swap_remove(location.vec_index);
410
411 debug_assert_eq!(removed_task.get_id(), task_id);
414
415 match removed_task.into_task_type() {
416 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
417 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
420 }
421 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
422 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
425 }
426 }
427
428 if location.vec_index < slot.len() {
431 let swapped_task_id = slot[location.vec_index].get_id();
432 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
435 swapped_location.vec_index = location.vec_index;
436 }
437 }
438
439 true
440 }
441
442 #[inline]
470 pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
471 let mut cancelled_count = 0;
472
473 if task_ids.len() <= self.batch_config.small_batch_threshold {
476 for &task_id in task_ids {
477 if self.cancel(task_id) {
478 cancelled_count += 1;
479 }
480 }
481 return cancelled_count;
482 }
483
484 let l0_slot_count = self.l0.slot_count;
489 let l1_slot_count = self.l1.slot_count;
490
491 let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
492 let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
493
494 for &task_id in task_ids {
497 if let Some(location) = self.task_index.get(&task_id) {
498 if location.level == 0 {
499 l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
500 } else {
501 l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
502 }
503 }
504 }
505
506 for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
509 if tasks.is_empty() {
510 continue;
511 }
512
513 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
516
517 let slot = &mut self.l0.slots[slot_index];
518
519 for &(task_id, vec_index) in tasks.iter() {
520 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
521 let removed_task = slot.swap_remove(vec_index);
522 match removed_task.into_task_type() {
523 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
524 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
527 }
528 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
529 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
532 }
533 }
534
535
536 if vec_index < slot.len() {
537 let swapped_task_id = slot[vec_index].get_id();
538 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
539 swapped_location.vec_index = vec_index;
540 }
541 }
542
543 self.task_index.remove(&task_id);
544 cancelled_count += 1;
545 }
546 }
547 }
548
549 for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
552 if tasks.is_empty() {
553 continue;
554 }
555
556 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
557
558 let slot = &mut self.l1.slots[slot_index];
559
560 for &(task_id, vec_index) in tasks.iter() {
561 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
562
563 let removed_task = slot.swap_remove(vec_index);
564
565 match removed_task.into_task_type() {
566 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
567 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
570 }
571 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
572 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
575 }
576 }
577
578 if vec_index < slot.len() {
579 let swapped_task_id = slot[vec_index].get_id();
580 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
581 swapped_location.vec_index = vec_index;
582 }
583 }
584
585 self.task_index.remove(&task_id);
586 cancelled_count += 1;
587 }
588 }
589 }
590
591 cancelled_count
592 }
593
594 fn reinsert_periodic_task(
602 &mut self,
603 periodic_task: TimerTaskWithCompletionNotifier,
604 ) {
605 let task_id = periodic_task.get_id();
606
607 let (level, ticks, rounds) = self.determine_layer(periodic_task.get_interval().unwrap());
610
611 let (current_tick, slot_mask, slots) = match level {
614 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
615 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
616 };
617
618 let total_ticks = current_tick + ticks;
619 let slot_index = (total_ticks as usize) & slot_mask;
620
621 let task = TimerTaskForWheel::new(periodic_task, total_ticks, rounds);
624
625 let vec_index = slots[slot_index].len();
628 let location = TaskLocation::new(level, slot_index, vec_index);
629
630 slots[slot_index].push(task);
633
634 self.task_index.insert(task_id, location);
637 }
638
639 pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
659 self.l0.current_tick += 1;
662
663 let mut expired_tasks = Vec::new();
664
665 let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
670
671 let mut periodic_tasks_to_reinsert = Vec::new();
674
675 {
676 let l0_slot = &mut self.l0.slots[l0_slot_index];
677
678 let i = 0;
679 while i < l0_slot.len() {
680 let task_id = l0_slot[i].get_id();
683 self.task_index.remove(&task_id);
684
685 let task_with_notifier = l0_slot.swap_remove(i);
688
689 if i < l0_slot.len() {
692 let swapped_task_id = l0_slot[i].get_id();
693 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
694 swapped_location.vec_index = i;
695 }
696 }
697
698
699 let TimerTaskForWheel { task, .. } = task_with_notifier;
700
701 match task.task_type {
702 TaskTypeWithCompletionNotifier::Periodic { interval, completion_notifier } => {
703 let _ = completion_notifier.0.try_send(TaskCompletion::Called);
706
707 periodic_tasks_to_reinsert.push(TimerTaskWithCompletionNotifier {
708 id: task.id,
709 task_type: TaskTypeWithCompletionNotifier::Periodic { interval, completion_notifier },
710 delay: task.delay,
711 callback: task.callback.clone(),
712 });
713 }
714 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
715 completion_notifier.notify(crate::task::TaskCompletion::Called);
718 }
719 }
720
721 expired_tasks.push(WheelAdvanceResult {
722 id: task_id,
723 callback: task.callback,
724 });
725
726 }
729 }
730
731 for task_type in periodic_tasks_to_reinsert {
734 self.reinsert_periodic_task(task_type);
735 }
736
737 if self.l0.current_tick % self.l1_tick_ratio == 0 {
742 self.l1.current_tick += 1;
743 let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
744 let l1_slot = &mut self.l1.slots[l1_slot_index];
745
746 let mut tasks_to_demote = Vec::new();
749 let mut i = 0;
750 while i < l1_slot.len() {
751 let task = &mut l1_slot[i];
752
753 if task.rounds > 0 {
754 task.rounds -= 1;
757 if let Some(location) = self.task_index.get_mut(&task.get_id()) {
758 location.vec_index = i;
759 }
760 i += 1;
761 } else {
762 self.task_index.remove(&task.get_id());
765 let task_to_demote = l1_slot.swap_remove(i);
766
767 if i < l1_slot.len() {
768 let swapped_task_id = l1_slot[i].get_id();
769 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
770 swapped_location.vec_index = i;
771 }
772 }
773
774 tasks_to_demote.push(task_to_demote);
775 }
776 }
777
778 self.demote_tasks(tasks_to_demote);
781 }
782
783 expired_tasks
784 }
785
786 fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
794 for task in tasks {
795 let l1_tick_ratio = self.l1_tick_ratio;
800
801 let l1_deadline = task.deadline_tick;
804
805 let l0_deadline_tick = l1_deadline * l1_tick_ratio;
808 let l0_current_tick = self.l0.current_tick;
809
810 let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
813 l0_deadline_tick - l0_current_tick
814 } else {
815 1 };
817
818 let target_l0_tick = l0_current_tick + remaining_l0_ticks;
821 let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
822
823 let task_id = task.get_id();
824 let vec_index = self.l0.slots[l0_slot_index].len();
825 let location = TaskLocation::new(0, l0_slot_index, vec_index);
826
827 self.l0.slots[l0_slot_index].push(task);
830 self.task_index.insert(task_id, location);
831 }
832 }
833
834 #[allow(dead_code)]
838 pub fn is_empty(&self) -> bool {
839 self.task_index.is_empty()
840 }
841
842 #[inline]
878 pub fn postpone(
879 &mut self,
880 task_id: TaskId,
881 new_delay: Duration,
882 new_callback: Option<crate::task::CallbackWrapper>,
883 ) -> bool {
884 let old_location = match self.task_index.remove(&task_id) {
887 Some(loc) => loc,
888 None => return false,
889 };
890
891 let slot = match old_location.level {
894 0 => &mut self.l0.slots[old_location.slot_index],
895 _ => &mut self.l1.slots[old_location.slot_index],
896 };
897
898 if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id {
901 self.task_index.insert(task_id, old_location);
904 return false;
905 }
906
907 let mut task = slot.swap_remove(old_location.vec_index);
910
911 if old_location.vec_index < slot.len() {
914 let swapped_task_id = slot[old_location.vec_index].get_id();
915 if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
916 swapped_location.vec_index = old_location.vec_index;
917 }
918 }
919
920 task.update_delay(new_delay);
923 if let Some(callback) = new_callback {
924 task.update_callback(callback);
925 }
926
927 let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
930
931 let (current_tick, slot_mask, slots) = match new_level {
934 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
935 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
936 };
937
938 let total_ticks = current_tick + ticks;
939 let new_slot_index = (total_ticks as usize) & slot_mask;
940
941 task.deadline_tick = total_ticks;
944 task.rounds = new_rounds;
945
946 let new_vec_index = slots[new_slot_index].len();
949 let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
950
951 slots[new_slot_index].push(task);
952 self.task_index.insert(task_id, new_location);
953
954 true
955 }
956
957 #[inline]
987 pub fn postpone_batch(
988 &mut self,
989 updates: Vec<(TaskId, Duration)>,
990 ) -> usize {
991 let mut postponed_count = 0;
992
993 for (task_id, new_delay) in updates {
994 if self.postpone(task_id, new_delay, None) {
995 postponed_count += 1;
996 }
997 }
998
999 postponed_count
1000 }
1001
1002 pub fn postpone_batch_with_callbacks(
1018 &mut self,
1019 updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1020 ) -> usize {
1021 let mut postponed_count = 0;
1022
1023 for (task_id, new_delay, new_callback) in updates {
1024 if self.postpone(task_id, new_delay, new_callback) {
1025 postponed_count += 1;
1026 }
1027 }
1028
1029 postponed_count
1030 }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035 use super::*;
1036 use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1037
1038 #[test]
1039 fn test_wheel_creation() {
1040 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1041 assert_eq!(wheel.slot_count(), 512);
1042 assert_eq!(wheel.current_tick(), 0);
1043 assert!(wheel.is_empty());
1044 }
1045
1046 #[test]
1047 fn test_hierarchical_wheel_creation() {
1048 let config = WheelConfig::default();
1049
1050 let wheel = Wheel::new(config, BatchConfig::default());
1051 assert_eq!(wheel.slot_count(), 512); assert_eq!(wheel.current_tick(), 0);
1053 assert!(wheel.is_empty());
1054 assert_eq!(wheel.l1.slot_count, 64);
1056 assert_eq!(wheel.l1_tick_ratio, 100); }
1058
1059 #[test]
1060 fn test_hierarchical_config_validation() {
1061 let result = WheelConfig::builder()
1063 .l0_tick_duration(Duration::from_millis(10))
1064 .l0_slot_count(512)
1065 .l1_tick_duration(Duration::from_millis(15)) .l1_slot_count(64)
1067 .build();
1068
1069 assert!(result.is_err());
1070
1071 let result = WheelConfig::builder()
1073 .l0_tick_duration(Duration::from_millis(10))
1074 .l0_slot_count(512)
1075 .l1_tick_duration(Duration::from_secs(1)) .l1_slot_count(64)
1077 .build();
1078
1079 assert!(result.is_ok());
1080 }
1081
1082 #[test]
1083 fn test_layer_determination() {
1084 let config = WheelConfig::default();
1085
1086 let wheel = Wheel::new(config, BatchConfig::default());
1087
1088 let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1091 assert_eq!(level, 0);
1092 assert_eq!(rounds, 0);
1093
1094 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1097 assert_eq!(level, 1);
1098 assert_eq!(rounds, 0);
1099
1100 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1103 assert_eq!(level, 1);
1104 assert!(rounds > 0);
1105 }
1106
1107 #[test]
1108 fn test_hierarchical_insert_and_advance() {
1109 let config = WheelConfig::default();
1110
1111 let mut wheel = Wheel::new(config, BatchConfig::default());
1112
1113 let callback = CallbackWrapper::new(|| async {});
1115 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1116 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1117 let task_id = wheel.insert(task_with_notifier);
1118
1119 let location = wheel.task_index.get(&task_id).unwrap();
1121 assert_eq!(location.level, 0);
1122
1123 for _ in 0..10 {
1125 let expired = wheel.advance();
1126 if !expired.is_empty() {
1127 assert_eq!(expired.len(), 1);
1128 assert_eq!(expired[0].id, task_id);
1129 return;
1130 }
1131 }
1132 panic!("Task should have expired");
1133 }
1134
1135 #[test]
1136 fn test_hierarchical_l1_to_l0_demotion() {
1137 let config = WheelConfig::builder()
1138 .l0_tick_duration(Duration::from_millis(10))
1139 .l0_slot_count(512)
1140 .l1_tick_duration(Duration::from_millis(100)) .l1_slot_count(64)
1142 .build()
1143 .unwrap();
1144
1145 let mut wheel = Wheel::new(config, BatchConfig::default());
1146 let l1_tick_ratio = wheel.l1_tick_ratio;
1147 assert_eq!(l1_tick_ratio, 10); let callback = CallbackWrapper::new(|| async {});
1151 let task = TimerTask::new_oneshot(Duration::from_millis(6000), Some(callback));
1152 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1153 let task_id = wheel.insert(task_with_notifier);
1154
1155 let location = wheel.task_index.get(&task_id).unwrap();
1157 assert_eq!(location.level, 1);
1158
1159 let mut demoted = false;
1162 for i in 0..610 {
1163 wheel.advance();
1164
1165 if let Some(location) = wheel.task_index.get(&task_id) {
1167 if location.level == 0 && !demoted {
1168 demoted = true;
1169 println!("Task demoted to L0 at L0 tick {}", i); }
1171 }
1172 }
1173
1174 assert!(demoted, "Task should have been demoted from L1 to L0"); }
1176
1177 #[test]
1178 fn test_cross_layer_cancel() {
1179 let config = WheelConfig::default();
1180
1181 let mut wheel = Wheel::new(config, BatchConfig::default());
1182
1183 let callback1 = CallbackWrapper::new(|| async {});
1185 let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1186 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1187 let task_id1 = wheel.insert(task_with_notifier1);
1188
1189 let callback2 = CallbackWrapper::new(|| async {});
1191 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1192 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1193 let task_id2 = wheel.insert(task_with_notifier2);
1194
1195 assert_eq!(wheel.task_index.get(&task_id1).unwrap().level, 0);
1197 assert_eq!(wheel.task_index.get(&task_id2).unwrap().level, 1);
1198
1199 assert!(wheel.cancel(task_id1));
1201 assert!(wheel.task_index.get(&task_id1).is_none());
1202
1203 assert!(wheel.cancel(task_id2));
1205 assert!(wheel.task_index.get(&task_id2).is_none());
1206
1207 assert!(wheel.is_empty()); }
1209
1210 #[test]
1211 fn test_cross_layer_postpone() {
1212 let config = WheelConfig::default();
1213
1214 let mut wheel = Wheel::new(config, BatchConfig::default());
1215
1216 let callback = CallbackWrapper::new(|| async {});
1218 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1219 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1220 let task_id = wheel.insert(task_with_notifier);
1221
1222 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1224
1225 assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
1227
1228 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 1);
1230
1231 assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
1233
1234 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1236 }
1237
1238 #[test]
1239 fn test_delay_to_ticks() {
1240 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1241 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1242 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1243 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); }
1245
1246 #[test]
1247 fn test_wheel_invalid_slot_count() {
1248 let result = WheelConfig::builder()
1249 .l0_slot_count(100)
1250 .build();
1251 assert!(result.is_err());
1252 if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1253 assert_eq!(slot_count, 100);
1254 assert_eq!(reason, "L0 layer slot count must be power of 2"); } else {
1256 panic!("Expected InvalidSlotCount error"); }
1258 }
1259
1260 #[test]
1261 fn test_insert_batch() {
1262 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1263
1264 let tasks: Vec<TimerTaskWithCompletionNotifier> = (0..10)
1266 .map(|i| {
1267 let callback = CallbackWrapper::new(|| async {});
1268 let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
1269 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1270 task_with_notifier
1271 })
1272 .collect();
1273
1274 let task_ids = wheel.insert_batch(tasks);
1275
1276 assert_eq!(task_ids.len(), 10);
1277 assert!(!wheel.is_empty());
1278 }
1279
1280 #[test]
1281 fn test_cancel_batch() {
1282 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1283
1284 let mut task_ids = Vec::new();
1286 for i in 0..10 {
1287 let callback = CallbackWrapper::new(|| async {});
1288 let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
1289 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1290 let task_id = wheel.insert(task_with_notifier);
1291 task_ids.push(task_id);
1292 }
1293
1294 assert_eq!(task_ids.len(), 10);
1295
1296 let to_cancel = &task_ids[0..5];
1298 let cancelled_count = wheel.cancel_batch(to_cancel);
1299
1300 assert_eq!(cancelled_count, 5);
1301
1302 let cancelled_again = wheel.cancel_batch(to_cancel);
1304 assert_eq!(cancelled_again, 0);
1305
1306 let remaining = &task_ids[5..10];
1308 let cancelled_remaining = wheel.cancel_batch(remaining);
1309 assert_eq!(cancelled_remaining, 5);
1310
1311 assert!(wheel.is_empty());
1312 }
1313
1314 #[test]
1315 fn test_batch_operations_same_slot() {
1316 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1317
1318 let mut task_ids = Vec::new();
1320 for _ in 0..20 {
1321 let callback = CallbackWrapper::new(|| async {});
1322 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1323 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1324 let task_id = wheel.insert(task_with_notifier);
1325 task_ids.push(task_id);
1326 }
1327
1328 let cancelled_count = wheel.cancel_batch(&task_ids);
1330 assert_eq!(cancelled_count, 20);
1331 assert!(wheel.is_empty());
1332 }
1333
1334 #[test]
1335 fn test_postpone_single_task() {
1336 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1337
1338 let callback = CallbackWrapper::new(|| async {});
1340 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1341 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1342 let task_id = wheel.insert(task_with_notifier);
1343
1344 let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1346 assert!(postponed);
1347
1348 assert!(!wheel.is_empty());
1350
1351 for _ in 0..10 {
1353 let expired = wheel.advance();
1354 assert!(expired.is_empty());
1355 }
1356
1357 let mut triggered = false;
1359 for _ in 0..10 {
1360 let expired = wheel.advance();
1361 if !expired.is_empty() {
1362 assert_eq!(expired.len(), 1);
1363 assert_eq!(expired[0].id, task_id);
1364 triggered = true;
1365 break;
1366 }
1367 }
1368 assert!(triggered);
1369 }
1370
1371 #[test]
1372 fn test_postpone_with_new_callback() {
1373 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1374
1375 let old_callback = CallbackWrapper::new(|| async {});
1377 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(old_callback.clone()));
1378 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1379 let task_id = wheel.insert(task_with_notifier);
1380
1381 let new_callback = CallbackWrapper::new(|| async {});
1383 let postponed = wheel.postpone(task_id, Duration::from_millis(50), Some(new_callback));
1384 assert!(postponed);
1385
1386 let mut triggered = false;
1389 for i in 0..5 {
1390 let expired = wheel.advance();
1391 if !expired.is_empty() {
1392 assert_eq!(expired.len(), 1, "On the {}th advance, there should be 1 task triggered", i + 1);
1393 assert_eq!(expired[0].id, task_id);
1394 triggered = true;
1395 break;
1396 }
1397 }
1398 assert!(triggered, "Task should be triggered within 5 ticks"); }
1400
1401 #[test]
1402 fn test_postpone_nonexistent_task() {
1403 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1404
1405 let fake_task_id = TaskId::new();
1407 let postponed = wheel.postpone(fake_task_id, Duration::from_millis(100), None);
1408 assert!(!postponed);
1409 }
1410
1411 #[test]
1412 fn test_postpone_batch() {
1413 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1414
1415 let mut task_ids = Vec::new();
1417 for _ in 0..5 {
1418 let callback = CallbackWrapper::new(|| async {});
1419 let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(callback));
1420 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1421 let task_id = wheel.insert(task_with_notifier);
1422 task_ids.push(task_id);
1423 }
1424
1425 let updates: Vec<_> = task_ids
1427 .iter()
1428 .map(|&id| (id, Duration::from_millis(150)))
1429 .collect();
1430 let postponed_count = wheel.postpone_batch(updates);
1431 assert_eq!(postponed_count, 5);
1432
1433 for _ in 0..5 {
1435 let expired = wheel.advance();
1436 assert!(expired.is_empty(), "The first 5 ticks should not have tasks triggered");
1437 }
1438
1439 let mut total_triggered = 0;
1441 for _ in 0..10 {
1442 let expired = wheel.advance();
1443 total_triggered += expired.len();
1444 }
1445 assert_eq!(total_triggered, 5, "There should be 5 tasks triggered on the 15th tick"); }
1447
1448 #[test]
1449 fn test_postpone_batch_partial() {
1450 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1451
1452 let mut task_ids = Vec::new();
1454 for _ in 0..10 {
1455 let callback = CallbackWrapper::new(|| async {});
1456 let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(callback));
1457 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1458 let task_id = wheel.insert(task_with_notifier);
1459 task_ids.push(task_id);
1460 }
1461
1462 let fake_task_id = TaskId::new();
1464 let mut updates: Vec<_> = task_ids[0..5]
1465 .iter()
1466 .map(|&id| (id, Duration::from_millis(150)))
1467 .collect();
1468 updates.push((fake_task_id, Duration::from_millis(150)));
1469
1470 let postponed_count = wheel.postpone_batch(updates);
1471 assert_eq!(postponed_count, 5, "There should be 5 tasks successfully postponed (fake_task_id failed)"); let mut triggered_at_50ms = 0;
1475 for _ in 0..5 {
1476 let expired = wheel.advance();
1477 triggered_at_50ms += expired.len();
1478 }
1479 assert_eq!(triggered_at_50ms, 5, "There should be 5 tasks that were not postponed triggered on the 5th tick"); let mut triggered_at_150ms = 0;
1483 for _ in 0..10 {
1484 let expired = wheel.advance();
1485 triggered_at_150ms += expired.len();
1486 }
1487 assert_eq!(triggered_at_150ms, 5, "There should be 5 tasks that were postponed triggered on the 15th tick"); }
1489
1490 #[test]
1491 fn test_multi_round_tasks() {
1492 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1493
1494 let callback = CallbackWrapper::new(|| async {});
1498 let task = TimerTask::new_oneshot(Duration::from_secs(120), Some(callback));
1499 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1500 let task_id = wheel.insert(task_with_notifier);
1501
1502 let location = wheel.task_index.get(&task_id).unwrap();
1508 assert_eq!(location.level, 1);
1509
1510 for _ in 0..6400 {
1513 let _expired = wheel.advance();
1514 }
1516
1517 let location = wheel.task_index.get(&task_id);
1519 if let Some(loc) = location {
1520 assert_eq!(loc.level, 1);
1521 }
1522
1523 let mut triggered = false;
1525 for _ in 0..6000 {
1526 let expired = wheel.advance();
1527 if expired.iter().any(|t| t.id == task_id) {
1528 triggered = true;
1529 break;
1530 }
1531 }
1532 assert!(triggered, "Task should be triggered in the second round of L1"); }
1534
1535 #[test]
1536 fn test_minimum_delay() {
1537 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1538
1539 let callback = CallbackWrapper::new(|| async {});
1541 let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1542 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1543 let task_id: TaskId = wheel.insert(task_with_notifier);
1544
1545 let expired = wheel.advance();
1547 assert_eq!(expired.len(), 1, "Minimum delay task should be triggered after 1 tick"); assert_eq!(expired[0].id, task_id);
1549 }
1550
1551 #[test]
1552 fn test_empty_batch_operations() {
1553 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1554
1555 let task_ids = wheel.insert_batch(vec![]);
1557 assert_eq!(task_ids.len(), 0);
1558
1559 let cancelled = wheel.cancel_batch(&[]);
1561 assert_eq!(cancelled, 0);
1562
1563 let postponed = wheel.postpone_batch(vec![]);
1565 assert_eq!(postponed, 0);
1566 }
1567
1568 #[test]
1569 fn test_postpone_same_task_multiple_times() {
1570 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1571
1572 let callback = CallbackWrapper::new(|| async {});
1574 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1575 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1576 let task_id = wheel.insert(task_with_notifier);
1577
1578 let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1580 assert!(postponed, "First postpone should succeed");
1581
1582 let postponed = wheel.postpone(task_id, Duration::from_millis(300), None);
1584 assert!(postponed, "Second postpone should succeed");
1585
1586 let postponed = wheel.postpone(task_id, Duration::from_millis(50), None);
1588 assert!(postponed, "Third postpone should succeed");
1589
1590 let mut triggered = false;
1592 for _ in 0..5 {
1593 let expired = wheel.advance();
1594 if !expired.is_empty() {
1595 assert_eq!(expired.len(), 1);
1596 assert_eq!(expired[0].id, task_id);
1597 triggered = true;
1598 break;
1599 }
1600 }
1601 assert!(triggered, "Task should be triggered at the last postpone time"); }
1603
1604 #[test]
1605 fn test_advance_empty_slots() {
1606 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1607
1608 for _ in 0..100 {
1610 let expired = wheel.advance();
1611 assert!(expired.is_empty(), "Empty slots should not return any tasks");
1612 }
1613
1614 assert_eq!(wheel.current_tick(), 100, "current_tick should correctly increment"); }
1616
1617 #[test]
1618 fn test_cancel_after_postpone() {
1619 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1620
1621 let callback = CallbackWrapper::new(|| async {});
1623 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1624 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1625 let task_id = wheel.insert(task_with_notifier);
1626
1627 let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1629 assert!(postponed, "Postpone should succeed");
1630
1631 let cancelled = wheel.cancel(task_id);
1633 assert!(cancelled, "Cancel should succeed");
1634
1635 for _ in 0..20 {
1637 let expired = wheel.advance();
1638 assert!(expired.is_empty(), "Cancelled task should not trigger"); }
1640
1641 assert!(wheel.is_empty(), "Wheel should be empty"); }
1643
1644 #[test]
1645 fn test_slot_boundary() {
1646 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1647
1648 let callback1 = CallbackWrapper::new(|| async {});
1651 let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1652 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1653 let task_id_1 = wheel.insert(task_with_notifier1);
1654
1655 let callback2 = CallbackWrapper::new(|| async {});
1657 let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1658 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1659 let task_id_2 = wheel.insert(task_with_notifier2);
1660
1661 let expired = wheel.advance();
1663 assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); assert_eq!(expired[0].id, task_id_1);
1665
1666 let mut triggered = false;
1668 for i in 0..510 {
1669 let expired = wheel.advance();
1670 if !expired.is_empty() {
1671 assert_eq!(expired.len(), 1, "The {}th advance should trigger the second task", i + 2); assert_eq!(expired[0].id, task_id_2);
1673 triggered = true;
1674 break;
1675 }
1676 }
1677 assert!(triggered, "Second task should trigger on tick 511"); assert!(wheel.is_empty(), "All tasks should have been triggered"); }
1681
1682 #[test]
1683 fn test_batch_cancel_small_threshold() {
1684 let batch_config = BatchConfig {
1686 small_batch_threshold: 5,
1687 };
1688 let mut wheel = Wheel::new(WheelConfig::default(), batch_config);
1689
1690 let mut task_ids = Vec::new();
1692 for _ in 0..10 {
1693 let callback = CallbackWrapper::new(|| async {});
1694 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1695 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1696 let task_id = wheel.insert(task_with_notifier);
1697 task_ids.push(task_id);
1698 }
1699
1700 let cancelled = wheel.cancel_batch(&task_ids[0..3]);
1702 assert_eq!(cancelled, 3);
1703
1704 let cancelled = wheel.cancel_batch(&task_ids[3..10]);
1706 assert_eq!(cancelled, 7);
1707
1708 assert!(wheel.is_empty()); }
1710
1711 #[test]
1712 fn test_task_id_uniqueness() {
1713 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1714
1715 let mut task_ids = std::collections::HashSet::new();
1717 for _ in 0..100 {
1718 let callback = CallbackWrapper::new(|| async {});
1719 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1720 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1721 let task_id = wheel.insert(task_with_notifier);
1722
1723 assert!(task_ids.insert(task_id), "TaskId should be unique"); }
1725
1726 assert_eq!(task_ids.len(), 100);
1727 }
1728
1729 #[test]
1730 fn test_periodic_task_basic() {
1731 use crate::task::CompletionReceiver;
1732
1733 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1734
1735 let callback = CallbackWrapper::new(|| async {});
1737 let task = TimerTask::new_periodic(
1738 Duration::from_millis(50), Duration::from_millis(50), Some(callback),
1741 None
1742 );
1743 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1744 let task_id = wheel.insert(task_with_notifier);
1745
1746 let mut rx = match completion_receiver {
1747 CompletionReceiver::Periodic(receiver) => receiver,
1748 _ => panic!("Expected periodic completion receiver"),
1749 };
1750
1751 for _ in 0..5 {
1754 wheel.advance();
1755 }
1756
1757 assert!(rx.try_recv().is_ok(), "Should receive first notification");
1760
1761 assert!(!wheel.is_empty(), "Periodic task should be reinserted");
1764 assert!(wheel.task_index.contains_key(&task_id), "Task should still be in index");
1765
1766 for _ in 0..5 {
1769 wheel.advance();
1770 }
1771
1772 assert!(rx.try_recv().is_ok(), "Should receive second notification");
1775 assert!(!wheel.is_empty(), "Periodic task should still be in the wheel");
1776
1777 assert!(wheel.cancel(task_id), "Should be able to cancel periodic task");
1780 assert!(wheel.is_empty(), "Wheel should be empty after cancellation");
1781 }
1782
1783 #[test]
1784 fn test_periodic_task_cancel() {
1785 use crate::task::{TaskCompletion, CompletionReceiver};
1786
1787 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1788
1789 let callback = CallbackWrapper::new(|| async {});
1791 let task = TimerTask::new_periodic(
1792 Duration::from_millis(100),
1793 Duration::from_millis(100),
1794 Some(callback),
1795 None
1796 );
1797 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1798 let task_id = wheel.insert(task_with_notifier);
1799
1800 let mut rx = match completion_receiver {
1801 CompletionReceiver::Periodic(receiver) => receiver,
1802 _ => panic!("Expected periodic completion receiver"),
1803 };
1804
1805 assert!(wheel.cancel(task_id), "Should successfully cancel");
1807
1808 if let Ok(reason) = rx.try_recv() {
1811 assert_eq!(reason, TaskCompletion::Cancelled, "Should receive Cancelled notification");
1812 } else {
1813 panic!("Should receive cancellation notification");
1814 }
1815
1816 assert!(wheel.is_empty(), "Wheel should be empty");
1819
1820 for _ in 0..20 {
1823 let expired = wheel.advance();
1824 assert!(expired.is_empty(), "No tasks should expire after cancellation");
1825 }
1826 }
1827
1828 #[test]
1829 fn test_periodic_task_multiple_triggers() {
1830 use crate::task::CompletionReceiver;
1831
1832 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1833
1834 let callback = CallbackWrapper::new(|| async {});
1836 let task = TimerTask::new_periodic(
1837 Duration::from_millis(30),
1838 Duration::from_millis(30),
1839 Some(callback),
1840 None
1841 );
1842 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1843 let task_id = wheel.insert(task_with_notifier);
1844
1845 let mut rx = match completion_receiver {
1846 CompletionReceiver::Periodic(receiver) => receiver,
1847 _ => panic!("Expected periodic completion receiver"),
1848 };
1849
1850 let mut trigger_count = 0;
1852 for _ in 0..100 {
1853 wheel.advance();
1854
1855 while let Ok(_) = rx.try_recv() {
1858 trigger_count += 1;
1859 }
1860 }
1861
1862 assert!(trigger_count >= 3, "Should trigger at least 3 times, got {}", trigger_count);
1865
1866 wheel.cancel(task_id);
1869 }
1870
1871 #[test]
1872 fn test_periodic_task_cross_layer() {
1873 use crate::task::CompletionReceiver;
1874
1875 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1876
1877 let callback = CallbackWrapper::new(|| async {});
1881 let task = TimerTask::new_periodic(
1882 Duration::from_secs(10), Duration::from_secs(10),
1884 Some(callback),
1885 None
1886 );
1887 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1888 let task_id = wheel.insert(task_with_notifier);
1889
1890 let mut rx = match completion_receiver {
1891 CompletionReceiver::Periodic(receiver) => receiver,
1892 _ => panic!("Expected periodic completion receiver"),
1893 };
1894
1895 let location = wheel.task_index.get(&task_id).unwrap();
1898 assert_eq!(location.level, 1, "Long interval task should be in L1");
1899
1900 for _ in 0..1001 {
1903 wheel.advance();
1904 }
1905
1906 assert!(rx.try_recv().is_ok(), "Should receive notification");
1909
1910 assert!(wheel.task_index.contains_key(&task_id), "Task should be reinserted");
1913 let location = wheel.task_index.get(&task_id).unwrap();
1914 assert_eq!(location.level, 1, "Reinserted task should still be in L1");
1915
1916 wheel.cancel(task_id);
1919 }
1920
1921 #[test]
1922 fn test_periodic_task_batch_cancel() {
1923 use crate::task::{TaskCompletion, CompletionReceiver};
1924
1925 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1926
1927 let mut task_ids = Vec::new();
1929 let mut receivers = Vec::new();
1930
1931 for i in 0..5 {
1932 let callback = CallbackWrapper::new(|| async {});
1933 let task = TimerTask::new_periodic(
1934 Duration::from_millis(100 + i * 10),
1935 Duration::from_millis(100),
1936 Some(callback),
1937 None
1938 );
1939 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1940 let task_id = wheel.insert(task_with_notifier);
1941 task_ids.push(task_id);
1942
1943 if let CompletionReceiver::Periodic(rx) = completion_receiver {
1944 receivers.push(rx);
1945 }
1946 }
1947
1948 let cancelled_count = wheel.cancel_batch(&task_ids);
1950 assert_eq!(cancelled_count, 5, "Should cancel all 5 tasks");
1951
1952 for mut rx in receivers {
1955 if let Ok(reason) = rx.try_recv() {
1956 assert_eq!(reason, TaskCompletion::Cancelled);
1957 } else {
1958 panic!("Should receive cancellation notification");
1959 }
1960 }
1961
1962 assert!(wheel.is_empty(), "Wheel should be empty");
1963 }
1964
1965 #[test]
1966 fn test_periodic_task_with_initial_delay() {
1967 use crate::task::CompletionReceiver;
1968
1969 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1970
1971 let callback = CallbackWrapper::new(|| async {});
1974 let task = TimerTask::new_periodic(
1975 Duration::from_millis(100), Duration::from_millis(50), Some(callback),
1978 None
1979 );
1980 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1981 let task_id = wheel.insert(task_with_notifier);
1982
1983 let mut rx = match completion_receiver {
1984 CompletionReceiver::Periodic(receiver) => receiver,
1985 _ => panic!("Expected periodic completion receiver"),
1986 };
1987
1988 for _ in 0..10 {
1991 wheel.advance();
1992 }
1993
1994 assert!(rx.try_recv().is_ok(), "Should receive first notification after initial delay");
1995
1996 for _ in 0..5 {
1999 wheel.advance();
2000 }
2001
2002 assert!(rx.try_recv().is_ok(), "Should receive second notification after interval");
2003
2004 wheel.cancel(task_id);
2007 }
2008
2009 #[test]
2010 fn test_periodic_task_postpone() {
2011 use crate::task::CompletionReceiver;
2012
2013 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2014
2015 let callback = CallbackWrapper::new(|| async {});
2018 let task = TimerTask::new_periodic(
2019 Duration::from_millis(50),
2020 Duration::from_millis(50),
2021 Some(callback),
2022 None
2023 );
2024 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2025 let task_id = wheel.insert(task_with_notifier);
2026
2027 let mut rx = match completion_receiver {
2028 CompletionReceiver::Periodic(receiver) => receiver,
2029 _ => panic!("Expected periodic completion receiver"),
2030 };
2031
2032 assert!(wheel.postpone(task_id, Duration::from_millis(100), None), "Should postpone periodic task");
2035
2036 for _ in 0..5 {
2039 wheel.advance();
2040 }
2041 assert!(rx.try_recv().is_err(), "Should not receive notification before postponed time");
2042
2043 for _ in 0..5 {
2046 wheel.advance();
2047 }
2048 assert!(rx.try_recv().is_ok(), "Should receive notification at postponed time");
2049
2050 for _ in 0..5 {
2053 wheel.advance();
2054 }
2055 assert!(rx.try_recv().is_ok(), "Should receive second notification after interval");
2056
2057 wheel.cancel(task_id);
2058 }
2059
2060 #[test]
2061 fn test_periodic_task_postpone_cross_layer() {
2062 use crate::task::CompletionReceiver;
2063
2064 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2065
2066 let callback = CallbackWrapper::new(|| async {});
2069 let task = TimerTask::new_periodic(
2070 Duration::from_millis(100),
2071 Duration::from_millis(100),
2072 Some(callback),
2073 None
2074 );
2075 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2076 let task_id = wheel.insert(task_with_notifier);
2077
2078 let mut rx = match completion_receiver {
2079 CompletionReceiver::Periodic(receiver) => receiver,
2080 _ => panic!("Expected periodic completion receiver"),
2081 };
2082
2083 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
2086
2087 assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
2091
2092 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 1);
2095
2096 assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
2099
2100 assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
2103
2104 for _ in 0..20 {
2107 wheel.advance();
2108 }
2109 assert!(rx.try_recv().is_ok(), "Should receive notification");
2110
2111 wheel.cancel(task_id);
2112 }
2113
2114 #[test]
2115 fn test_periodic_task_batch_insert() {
2116 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2117
2118 let tasks: Vec<TimerTaskWithCompletionNotifier> = (0..10)
2121 .map(|i| {
2122 let callback = CallbackWrapper::new(|| async {});
2123 let task = TimerTask::new_periodic(
2124 Duration::from_millis(100 + i * 10),
2125 Duration::from_millis(50),
2126 Some(callback),
2127 None
2128 );
2129 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2130 task_with_notifier
2131 })
2132 .collect();
2133
2134 let task_ids = wheel.insert_batch(tasks);
2135
2136 assert_eq!(task_ids.len(), 10, "Should insert all 10 periodic tasks");
2137 assert_eq!(wheel.task_index.len(), 10, "All tasks should be in index");
2138
2139 for _ in 0..200 {
2142 let _expired = wheel.advance();
2143 }
2146
2147 assert_eq!(wheel.task_index.len(), 10, "All periodic tasks should still be in wheel");
2150
2151 let cancelled = wheel.cancel_batch(&task_ids);
2154 assert_eq!(cancelled, 10, "Should cancel all periodic tasks");
2155 assert!(wheel.is_empty());
2156 }
2157
2158 #[test]
2159 fn test_periodic_task_batch_postpone() {
2160 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2161
2162 let mut task_ids = Vec::new();
2165 for _ in 0..5 {
2166 let callback = CallbackWrapper::new(|| async {});
2167 let task = TimerTask::new_periodic(
2168 Duration::from_millis(50),
2169 Duration::from_millis(50),
2170 Some(callback),
2171 None
2172 );
2173 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2174 let task_id = wheel.insert(task_with_notifier);
2175 task_ids.push(task_id);
2176 }
2177
2178 let updates: Vec<_> = task_ids
2181 .iter()
2182 .map(|&id| (id, Duration::from_millis(150)))
2183 .collect();
2184 let postponed_count = wheel.postpone_batch(updates);
2185 assert_eq!(postponed_count, 5, "Should postpone all 5 periodic tasks");
2186
2187 for _ in 0..5 {
2190 let expired = wheel.advance();
2191 assert!(expired.is_empty(), "Tasks should not trigger before postponed time");
2192 }
2193
2194 let mut total_triggered = 0;
2197 for _ in 0..10 {
2198 let expired = wheel.advance();
2199 total_triggered += expired.len();
2200 }
2201 assert_eq!(total_triggered, 5, "All 5 tasks should trigger at postponed time");
2202
2203 wheel.cancel_batch(&task_ids);
2206 }
2207
2208 #[test]
2209 fn test_mixed_oneshot_and_periodic_tasks() {
2210 use crate::task::CompletionReceiver;
2211
2212 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2213
2214 let mut oneshot_ids = Vec::new();
2217 for i in 0..5 {
2218 let callback = CallbackWrapper::new(|| async {});
2219 let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
2220 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2221 let task_id = wheel.insert(task_with_notifier);
2222 oneshot_ids.push(task_id);
2223 }
2224
2225 let mut periodic_ids = Vec::new();
2228 let mut periodic_receivers = Vec::new();
2229 for _ in 0..5 {
2230 let callback = CallbackWrapper::new(|| async {});
2231 let task = TimerTask::new_periodic(
2232 Duration::from_millis(100),
2233 Duration::from_millis(100),
2234 Some(callback),
2235 None
2236 );
2237 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2238 let task_id = wheel.insert(task_with_notifier);
2239 periodic_ids.push(task_id);
2240
2241 if let CompletionReceiver::Periodic(rx) = completion_receiver {
2242 periodic_receivers.push(rx);
2243 }
2244 }
2245
2246 assert_eq!(wheel.task_index.len(), 10, "Should have 10 tasks total");
2247
2248 let mut total_expired = 0;
2251 for _ in 0..15 {
2252 let expired = wheel.advance();
2253 total_expired += expired.len();
2254 }
2255
2256 assert_eq!(total_expired, 10, "Should have triggered oneshot and periodic tasks");
2259
2260 assert_eq!(wheel.task_index.len(), 5, "Only periodic tasks should remain");
2263
2264 for id in &oneshot_ids {
2267 assert!(!wheel.task_index.contains_key(id), "Oneshot task should be removed");
2268 }
2269
2270 for id in &periodic_ids {
2273 assert!(wheel.task_index.contains_key(id), "Periodic task should still be present");
2274 }
2275
2276 wheel.cancel_batch(&periodic_ids);
2279 assert!(wheel.is_empty());
2280 }
2281
2282 #[test]
2283 fn test_periodic_task_postpone_with_callback() {
2284 use crate::task::CompletionReceiver;
2285
2286 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2287
2288 let old_callback = CallbackWrapper::new(|| async {});
2291 let task = TimerTask::new_periodic(
2292 Duration::from_millis(50),
2293 Duration::from_millis(50),
2294 Some(old_callback),
2295 None
2296 );
2297 let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2298 let task_id = wheel.insert(task_with_notifier);
2299
2300 let mut rx = match completion_receiver {
2301 CompletionReceiver::Periodic(receiver) => receiver,
2302 _ => panic!("Expected periodic completion receiver"),
2303 };
2304
2305 let new_callback = CallbackWrapper::new(|| async {});
2308 assert!(wheel.postpone(task_id, Duration::from_millis(100), Some(new_callback)));
2309
2310 for _ in 0..10 {
2313 wheel.advance();
2314 }
2315
2316 assert!(rx.try_recv().is_ok(), "Should receive notification with new callback");
2317
2318 wheel.cancel(task_id);
2319 }
2320}
2321