atomr_core/actor/scheduler/
hashed_wheel.rs1use std::collections::VecDeque;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use futures_util::future::BoxFuture;
18use parking_lot::Mutex;
19
20use super::{Scheduler, SchedulerHandle};
21
22type Callback = Box<dyn FnOnce() + Send>;
23
24struct Slot {
25 items: VecDeque<Entry>,
26}
27
28struct Entry {
29 rounds: u64,
30 cancel: Arc<AtomicBool>,
31 cb: Callback,
32}
33
34pub struct HashedWheelTimerScheduler {
35 inner: Arc<Inner>,
36}
37
38struct Inner {
39 tick: Duration,
40 mask: usize,
41 slots: Mutex<Vec<Slot>>,
42 cursor: Mutex<usize>,
43 shutdown: AtomicBool,
44}
45
46impl HashedWheelTimerScheduler {
47 pub fn new(tick: Duration, ticks_per_wheel: usize) -> Self {
48 assert!(ticks_per_wheel.is_power_of_two(), "ticks_per_wheel must be power of two");
49 let slots = (0..ticks_per_wheel).map(|_| Slot { items: VecDeque::new() }).collect();
50 let inner = Arc::new(Inner {
51 tick,
52 mask: ticks_per_wheel - 1,
53 slots: Mutex::new(slots),
54 cursor: Mutex::new(0),
55 shutdown: AtomicBool::new(false),
56 });
57 let i2 = inner.clone();
58 tokio::spawn(async move {
59 let mut ticker = tokio::time::interval(tick);
60 loop {
61 ticker.tick().await;
62 if i2.shutdown.load(Ordering::Acquire) {
63 break;
64 }
65 i2.tick();
66 }
67 });
68 Self { inner }
69 }
70
71 pub fn shutdown(&self) {
72 self.inner.shutdown.store(true, Ordering::Release);
73 }
74
75 fn schedule(&self, delay: Duration, cancel: Arc<AtomicBool>, cb: Callback) {
76 let ticks = (delay.as_nanos() / self.inner.tick.as_nanos().max(1)) as u64;
77 let slots_len = (self.inner.mask + 1) as u64;
78 let rounds = ticks / slots_len;
79 let offset = (ticks % slots_len) as usize;
80 let cursor = self.inner.cursor.lock();
81 let idx = (*cursor + offset) & self.inner.mask;
82 drop(cursor);
83 let mut slots = self.inner.slots.lock();
84 slots[idx].items.push_back(Entry { rounds, cancel, cb });
85 }
86}
87
88impl Inner {
89 fn tick(&self) {
90 let mut cursor = self.cursor.lock();
91 let idx = *cursor & self.mask;
92 *cursor = cursor.wrapping_add(1);
93 drop(cursor);
94
95 let mut due: Vec<Callback> = Vec::new();
96 {
97 let mut slots = self.slots.lock();
98 let slot = &mut slots[idx];
99 let mut kept: VecDeque<Entry> = VecDeque::with_capacity(slot.items.len());
100 while let Some(mut e) = slot.items.pop_front() {
101 if e.cancel.load(Ordering::Acquire) {
102 continue;
103 }
104 if e.rounds == 0 {
105 due.push(e.cb);
106 } else {
107 e.rounds -= 1;
108 kept.push_back(e);
109 }
110 }
111 slot.items = kept;
112 }
113 for cb in due {
114 cb();
115 }
116 }
117}
118
119impl Scheduler for HashedWheelTimerScheduler {
120 fn schedule_once(&self, delay: Duration, task: BoxFuture<'static, ()>) -> SchedulerHandle {
121 let cancel = Arc::new(AtomicBool::new(false));
122 let c = cancel.clone();
123 let cb: Callback = Box::new(move || {
124 tokio::spawn(task);
125 });
126 self.schedule(delay, c, cb);
127 SchedulerHandle { cancel }
128 }
129
130 fn schedule_at_fixed_rate(
131 &self,
132 initial_delay: Duration,
133 interval: Duration,
134 task: Arc<dyn Fn() + Send + Sync>,
135 ) -> SchedulerHandle {
136 let cancel = Arc::new(AtomicBool::new(false));
137 let c = cancel.clone();
138 let t = task.clone();
141 tokio::spawn(async move {
142 tokio::time::sleep(initial_delay).await;
143 let mut tick = tokio::time::interval(interval);
144 tick.tick().await;
145 loop {
146 if c.load(Ordering::Acquire) {
147 break;
148 }
149 t();
150 tick.tick().await;
151 }
152 });
153 SchedulerHandle { cancel }
154 }
155}
156
157impl Drop for HashedWheelTimerScheduler {
158 fn drop(&mut self) {
159 self.shutdown();
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[tokio::test]
168 async fn fires_after_delay() {
169 let s = HashedWheelTimerScheduler::new(Duration::from_millis(2), 64);
170 let (tx, rx) = tokio::sync::oneshot::channel();
171 let _ = s.schedule_once(
172 Duration::from_millis(10),
173 Box::pin(async move {
174 let _ = tx.send(());
175 }),
176 );
177 tokio::time::timeout(Duration::from_millis(200), rx).await.expect("timer fired").unwrap();
178 }
179}