wheel_timer2/
wheel.rs

1use std::any::Any;
2use std::time::Duration;
3
4use crossbeam_channel::Receiver;
5
6use crate::behave::Behave;
7use crate::callback::BoxedCallback;
8use crate::handle::AddHandle;
9use crate::task::Task;
10
11#[derive(Clone, Debug, Default)]
12pub struct MoveTo(pub Vec<(usize, usize)>);
13
14#[derive(Debug)]
15pub struct MultiWheel {
16    capacity: usize,
17    wheels: Vec<Wheel>,
18    granularity: Duration,
19
20    #[allow(clippy::type_complexity)]
21    add_handle: (
22        Receiver<(BoxedCallback, Duration, Box<dyn Any + Send>)>,
23        AddHandle,
24    ),
25}
26
27#[derive(Default, Debug)]
28pub struct Wheel {
29    slots: Vec<Slot>,
30    index: usize,
31}
32
33#[derive(Default, Debug)]
34pub struct Slot {
35    tasks: Vec<Task>,
36}
37
38impl MultiWheel {
39    /// Create a roulette wheel consisting of multiple time wheels
40    ///
41    /// `n`: `n` layers of wheels
42    /// `capacity`: how many slots per wheel
43    /// `granularity`: the granularity of the time wheel, which must be consistent with the interval at which [MultiWheel::tick] is called.
44    pub fn new(n: usize, capacity: usize, granularity: Duration) -> Self {
45        let mut wheels = Vec::with_capacity(n);
46        wheels.resize_with(n, || Wheel::new(capacity));
47
48        let (tx, rx) = crossbeam_channel::unbounded();
49        Self {
50            capacity,
51            wheels,
52            granularity,
53            add_handle: (rx, AddHandle(tx)),
54        }
55    }
56
57    #[inline]
58    pub fn add_handle(&self) -> AddHandle {
59        self.add_handle.1.clone()
60    }
61
62    fn add_task(&mut self, cb: BoxedCallback, dur: Duration, ctx: Box<dyn Any + Send>) {
63        let tick = (dur.as_nanos() as f64 / self.granularity.as_nanos() as f64).round() as usize;
64        let task = Task {
65            cb,
66            round: 0,
67            move_to: Default::default(),
68            ctx,
69            tick,
70        };
71        self.add(task)
72    }
73
74    fn add(&mut self, mut task: Task) {
75        let mut layer = 0;
76        let mut ticks = task.tick;
77        let slot_pos = loop {
78            let current_index = self.wheels.get(layer).expect("empty wheel").index;
79
80            if task.tick == 0 {
81                // if the duration of this task is greater than the granularity of the time wheel,
82                // it is scheduled to run at the next moment.
83                // worst case runs after a `granular`, fastest possible `immediately`
84
85                // nearest position
86                break current_index + 1;
87            } else {
88                let current = (current_index + ticks) % self.capacity;
89                let next = (current_index + ticks) / self.capacity;
90
91                if next == 0 {
92                    break current;
93                } else if self.wheels.get(layer + 1).is_some() {
94                    task.move_to.0.push((layer, current));
95                    layer += 1;
96                    ticks = next;
97                } else {
98                    task.round = next - 1;
99                    task.move_to.0.push((layer, current));
100                    break 0;
101                }
102            }
103        };
104
105        self.wheels
106            .get_mut(layer)
107            .unwrap()
108            .slots
109            .get_mut(slot_pos)
110            .unwrap()
111            .tasks
112            .push(task);
113    }
114
115    fn roll(&mut self, wheel_index: usize) -> Option<()> {
116        if self.wheels.get_mut(wheel_index)?.roll() {
117            self.roll(wheel_index + 1);
118        }
119
120        let mut task_i = 0;
121        loop {
122            let wheel = self.wheels.get_mut(wheel_index).unwrap();
123            let slot = &mut wheel.slots.get_mut(wheel.index).unwrap().tasks;
124            if slot.is_empty() {
125                break;
126            }
127
128            let mut task = if let Some(task) = slot.get_mut(task_i) {
129                task
130            } else {
131                break;
132            };
133
134            if task.round > 0 {
135                task.round -= 1;
136                task_i += 1;
137            } else if let Some((layer, slot_index)) = task.move_to.0.pop() {
138                let task = slot.swap_remove(task_i);
139                let last_wheel = self.wheels.get_mut(layer).unwrap();
140                last_wheel
141                    .slots
142                    .get_mut(slot_index)
143                    .unwrap()
144                    .tasks
145                    .push(task);
146            } else {
147                let mut task = slot.swap_remove(task_i);
148                match task.cb.call(&mut task.ctx) {
149                    Behave::Cancel => {}
150                    Behave::Change(dur) => {
151                        self.add_task(task.cb, dur, task.ctx);
152                    }
153                    Behave::Repeat => {
154                        self.add(Task {
155                            cb: task.cb,
156                            round: 0,
157                            move_to: Default::default(),
158                            ctx: task.ctx,
159                            tick: task.tick,
160                        });
161                    }
162                }
163            }
164        }
165
166        Some(())
167    }
168
169    /// move wheel to next tick
170    #[inline]
171    pub fn tick(&mut self) {
172        while let Ok((cb, dur, ctx)) = self.add_handle.0.try_recv() {
173            self.add_task(cb, dur, ctx)
174        }
175        self.roll(0);
176    }
177}
178
179impl Wheel {
180    pub fn new(capacity: usize) -> Wheel {
181        let mut slots = Vec::with_capacity(capacity);
182        slots.resize_with(capacity, Default::default);
183        Wheel {
184            slots,
185            ..Default::default()
186        }
187    }
188
189    fn roll(&mut self) -> bool {
190        if self.index == self.slots.len() - 1 {
191            self.index = 0;
192            true
193        } else {
194            self.index += 1;
195            false
196        }
197    }
198}