1use std::any::Any;
2use std::time::Duration;
3
4use crossbeam_channel::Receiver;
5
6use crate::behave::Behave;
7use crate::callback::BoxedCallback;
8use crate::handle::AddHandle;
9use crate::task::Task;
10
11#[derive(Clone, Debug, Default)]
12pub struct MoveTo(pub Vec<(usize, usize)>);
13
14#[derive(Debug)]
15pub struct MultiWheel {
16 capacity: usize,
17 wheels: Vec<Wheel>,
18 granularity: Duration,
19
20 #[allow(clippy::type_complexity)]
21 add_handle: (
22 Receiver<(BoxedCallback, Duration, Box<dyn Any + Send>)>,
23 AddHandle,
24 ),
25}
26
27#[derive(Default, Debug)]
28pub struct Wheel {
29 slots: Vec<Slot>,
30 index: usize,
31}
32
33#[derive(Default, Debug)]
34pub struct Slot {
35 tasks: Vec<Task>,
36}
37
38impl MultiWheel {
39 pub fn new(n: usize, capacity: usize, granularity: Duration) -> Self {
45 let mut wheels = Vec::with_capacity(n);
46 wheels.resize_with(n, || Wheel::new(capacity));
47
48 let (tx, rx) = crossbeam_channel::unbounded();
49 Self {
50 capacity,
51 wheels,
52 granularity,
53 add_handle: (rx, AddHandle(tx)),
54 }
55 }
56
57 #[inline]
58 pub fn add_handle(&self) -> AddHandle {
59 self.add_handle.1.clone()
60 }
61
62 fn add_task(&mut self, cb: BoxedCallback, dur: Duration, ctx: Box<dyn Any + Send>) {
63 let tick = (dur.as_nanos() as f64 / self.granularity.as_nanos() as f64).round() as usize;
64 let task = Task {
65 cb,
66 round: 0,
67 move_to: Default::default(),
68 ctx,
69 tick,
70 };
71 self.add(task)
72 }
73
74 fn add(&mut self, mut task: Task) {
75 let mut layer = 0;
76 let mut ticks = task.tick;
77 let slot_pos = loop {
78 let current_index = self.wheels.get(layer).expect("empty wheel").index;
79
80 if task.tick == 0 {
81 break current_index + 1;
87 } else {
88 let current = (current_index + ticks) % self.capacity;
89 let next = (current_index + ticks) / self.capacity;
90
91 if next == 0 {
92 break current;
93 } else if self.wheels.get(layer + 1).is_some() {
94 task.move_to.0.push((layer, current));
95 layer += 1;
96 ticks = next;
97 } else {
98 task.round = next - 1;
99 task.move_to.0.push((layer, current));
100 break 0;
101 }
102 }
103 };
104
105 self.wheels
106 .get_mut(layer)
107 .unwrap()
108 .slots
109 .get_mut(slot_pos)
110 .unwrap()
111 .tasks
112 .push(task);
113 }
114
115 fn roll(&mut self, wheel_index: usize) -> Option<()> {
116 if self.wheels.get_mut(wheel_index)?.roll() {
117 self.roll(wheel_index + 1);
118 }
119
120 let mut task_i = 0;
121 loop {
122 let wheel = self.wheels.get_mut(wheel_index).unwrap();
123 let slot = &mut wheel.slots.get_mut(wheel.index).unwrap().tasks;
124 if slot.is_empty() {
125 break;
126 }
127
128 let mut task = if let Some(task) = slot.get_mut(task_i) {
129 task
130 } else {
131 break;
132 };
133
134 if task.round > 0 {
135 task.round -= 1;
136 task_i += 1;
137 } else if let Some((layer, slot_index)) = task.move_to.0.pop() {
138 let task = slot.swap_remove(task_i);
139 let last_wheel = self.wheels.get_mut(layer).unwrap();
140 last_wheel
141 .slots
142 .get_mut(slot_index)
143 .unwrap()
144 .tasks
145 .push(task);
146 } else {
147 let mut task = slot.swap_remove(task_i);
148 match task.cb.call(&mut task.ctx) {
149 Behave::Cancel => {}
150 Behave::Change(dur) => {
151 self.add_task(task.cb, dur, task.ctx);
152 }
153 Behave::Repeat => {
154 self.add(Task {
155 cb: task.cb,
156 round: 0,
157 move_to: Default::default(),
158 ctx: task.ctx,
159 tick: task.tick,
160 });
161 }
162 }
163 }
164 }
165
166 Some(())
167 }
168
169 #[inline]
171 pub fn tick(&mut self) {
172 while let Ok((cb, dur, ctx)) = self.add_handle.0.try_recv() {
173 self.add_task(cb, dur, ctx)
174 }
175 self.roll(0);
176 }
177}
178
179impl Wheel {
180 pub fn new(capacity: usize) -> Wheel {
181 let mut slots = Vec::with_capacity(capacity);
182 slots.resize_with(capacity, Default::default);
183 Wheel {
184 slots,
185 ..Default::default()
186 }
187 }
188
189 fn roll(&mut self) -> bool {
190 if self.index == self.slots.len() - 1 {
191 self.index = 0;
192 true
193 } else {
194 self.index += 1;
195 false
196 }
197 }
198}