Skip to main content

atomr_core/actor/scheduler/
hashed_wheel.rs

1//! Hashed wheel timer — port of akka.net's `HashedWheelTimerScheduler.cs`.
2//!
3//! A hashed wheel is a cyclic array of "buckets"; scheduling a timeout
4//! places the deadline in the bucket that corresponds to
5//! `(now + delay) / tick_duration`. A background task ticks through the
6//! buckets and fires anything whose "rounds remaining" has reached zero.
7//!
8//! This port exposes the same `Scheduler` trait as the simple
9//! [`super::TokioScheduler`]; users can pick via
10//! `akka.scheduler.implementation`.
11
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use futures_util::future::BoxFuture;
18use parking_lot::Mutex;
19
20use super::{Scheduler, SchedulerHandle};
21
22type Callback = Box<dyn FnOnce() + Send>;
23
24struct Slot {
25    items: VecDeque<Entry>,
26}
27
28struct Entry {
29    rounds: u64,
30    cancel: Arc<AtomicBool>,
31    cb: Callback,
32}
33
34pub struct HashedWheelTimerScheduler {
35    inner: Arc<Inner>,
36}
37
38struct Inner {
39    tick: Duration,
40    mask: usize,
41    slots: Mutex<Vec<Slot>>,
42    cursor: Mutex<usize>,
43    shutdown: AtomicBool,
44}
45
46impl HashedWheelTimerScheduler {
47    pub fn new(tick: Duration, ticks_per_wheel: usize) -> Self {
48        assert!(ticks_per_wheel.is_power_of_two(), "ticks_per_wheel must be power of two");
49        let slots = (0..ticks_per_wheel).map(|_| Slot { items: VecDeque::new() }).collect();
50        let inner = Arc::new(Inner {
51            tick,
52            mask: ticks_per_wheel - 1,
53            slots: Mutex::new(slots),
54            cursor: Mutex::new(0),
55            shutdown: AtomicBool::new(false),
56        });
57        let i2 = inner.clone();
58        tokio::spawn(async move {
59            let mut ticker = tokio::time::interval(tick);
60            loop {
61                ticker.tick().await;
62                if i2.shutdown.load(Ordering::Acquire) {
63                    break;
64                }
65                i2.tick();
66            }
67        });
68        Self { inner }
69    }
70
71    pub fn shutdown(&self) {
72        self.inner.shutdown.store(true, Ordering::Release);
73    }
74
75    fn schedule(&self, delay: Duration, cancel: Arc<AtomicBool>, cb: Callback) {
76        let ticks = (delay.as_nanos() / self.inner.tick.as_nanos().max(1)) as u64;
77        let slots_len = (self.inner.mask + 1) as u64;
78        let rounds = ticks / slots_len;
79        let offset = (ticks % slots_len) as usize;
80        let cursor = self.inner.cursor.lock();
81        let idx = (*cursor + offset) & self.inner.mask;
82        drop(cursor);
83        let mut slots = self.inner.slots.lock();
84        slots[idx].items.push_back(Entry { rounds, cancel, cb });
85    }
86}
87
88impl Inner {
89    fn tick(&self) {
90        let mut cursor = self.cursor.lock();
91        let idx = *cursor & self.mask;
92        *cursor = cursor.wrapping_add(1);
93        drop(cursor);
94
95        let mut due: Vec<Callback> = Vec::new();
96        {
97            let mut slots = self.slots.lock();
98            let slot = &mut slots[idx];
99            let mut kept: VecDeque<Entry> = VecDeque::with_capacity(slot.items.len());
100            while let Some(mut e) = slot.items.pop_front() {
101                if e.cancel.load(Ordering::Acquire) {
102                    continue;
103                }
104                if e.rounds == 0 {
105                    due.push(e.cb);
106                } else {
107                    e.rounds -= 1;
108                    kept.push_back(e);
109                }
110            }
111            slot.items = kept;
112        }
113        for cb in due {
114            cb();
115        }
116    }
117}
118
119impl Scheduler for HashedWheelTimerScheduler {
120    fn schedule_once(&self, delay: Duration, task: BoxFuture<'static, ()>) -> SchedulerHandle {
121        let cancel = Arc::new(AtomicBool::new(false));
122        let c = cancel.clone();
123        let cb: Callback = Box::new(move || {
124            tokio::spawn(task);
125        });
126        self.schedule(delay, c, cb);
127        SchedulerHandle { cancel }
128    }
129
130    fn schedule_at_fixed_rate(
131        &self,
132        initial_delay: Duration,
133        interval: Duration,
134        task: Arc<dyn Fn() + Send + Sync>,
135    ) -> SchedulerHandle {
136        let cancel = Arc::new(AtomicBool::new(false));
137        let c = cancel.clone();
138        // Delegate recurring timers to Tokio for simplicity: a full wheel
139        // implementation would self-reschedule inside the fired callback.
140        let t = task.clone();
141        tokio::spawn(async move {
142            tokio::time::sleep(initial_delay).await;
143            let mut tick = tokio::time::interval(interval);
144            tick.tick().await;
145            loop {
146                if c.load(Ordering::Acquire) {
147                    break;
148                }
149                t();
150                tick.tick().await;
151            }
152        });
153        SchedulerHandle { cancel }
154    }
155}
156
157impl Drop for HashedWheelTimerScheduler {
158    fn drop(&mut self) {
159        self.shutdown();
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[tokio::test]
168    async fn fires_after_delay() {
169        let s = HashedWheelTimerScheduler::new(Duration::from_millis(2), 64);
170        let (tx, rx) = tokio::sync::oneshot::channel();
171        let _ = s.schedule_once(
172            Duration::from_millis(10),
173            Box::pin(async move {
174                let _ = tx.send(());
175            }),
176        );
177        tokio::time::timeout(Duration::from_millis(200), rx).await.expect("timer fired").unwrap();
178    }
179}