timing_wheel/
lib.rs

1//! An improved hash time wheel algorithm based on binary heap priority queue.
2//!
3//! Unlike the classic [`Hashed and Hierarchical Timing Wheels`], this library uses a
4//! binary heap as a priority queue for timers.
5//! During [`spin`], it only needs to compare the timer ticks at the head of the
6//! heap to quickly detect expired timers.
7//!
8//! # Examples
9//!
10//! ```
11//! use std::time::{ Duration, Instant };
12//! use timing_wheel::TimeWheel;
13//! use std::thread::sleep;
14//!
15//! let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
16//!
17//! time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
18//!
19//! sleep(Duration::from_millis(2));
20//!
21//! let mut wakers = vec![];
22//!
23//! time_wheel.spin(&mut wakers);
24//!
25//! assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
26//! ```
27//!
28//! [`spin`]: TimeWheel::spin
29//! [`Hashed and Hierarchical Timing Wheels`]: https://dl.acm.org/doi/pdf/10.1145/41457.37504
30
31use std::{
32    cmp::Reverse,
33    collections::{BinaryHeap, HashMap},
34    time::{Duration, Instant},
35};
36
37type Slot = Reverse<u64>;
38
39/// A binary-heap based timing wheel implementation.
40pub struct TimeWheel<T> {
41    /// The timewheel start timestamp.
42    start: Instant,
43    /// timewheel tick interval in secs.
44    tick_interval: u64,
45    /// ticks
46    ticks: u64,
47    /// slot queue.
48    priority_queue: BinaryHeap<Slot>,
49    /// timers.
50    timers: HashMap<u64, Vec<T>>,
51    /// alive timer counter.
52    counter: usize,
53}
54
55impl<T> TimeWheel<T> {
56    /// Create a time-wheel with minimum time interval resolution `tick_interval`.
57    pub fn new(tick_interval: Duration) -> Self {
58        Self {
59            tick_interval: tick_interval.as_micros() as u64,
60            ticks: 0,
61            start: Instant::now(),
62            priority_queue: Default::default(),
63            timers: Default::default(),
64            counter: 0,
65        }
66    }
67
68    /// Returns the number of alive timers.
69    pub fn len(&self) -> usize {
70        self.counter
71    }
72
73    /// Create a new timer using provided `deadline`.
74    ///
75    /// Return `None` if the deadline is already reach.
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use std::time::{ Duration, Instant };
81    /// use timing_wheel::TimeWheel;
82    /// use std::thread::sleep;
83    ///
84    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
85    ///
86    /// time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
87    ///
88    /// sleep(Duration::from_millis(2));
89    ///
90    /// let mut wakers = vec![];
91    ///
92    /// time_wheel.spin(&mut wakers);
93    ///
94    /// assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
95    /// ```
96    pub fn deadline(&mut self, deadline: Instant, value: T) -> Option<u64> {
97        let interval = (deadline - self.start).as_micros() as u64;
98
99        let mut ticks = interval / self.tick_interval;
100
101        if interval % self.tick_interval != 0 {
102            ticks += 1;
103        }
104
105        if !(ticks > self.ticks) {
106            return None;
107        }
108
109        if let Some(timers) = self.timers.get_mut(&ticks) {
110            timers.push(value);
111        } else {
112            self.timers.insert(ticks, vec![value]);
113            self.priority_queue.push(Reverse(ticks));
114        }
115
116        self.counter += 1;
117
118        Some(ticks)
119    }
120
121    /// Create a new `deadline` with a value equal to `Instant::now() + duration`.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use std::time::Duration;
127    /// use timing_wheel::TimeWheel;
128    /// use std::thread::sleep;
129    ///
130    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
131    ///
132    /// time_wheel.after(Duration::from_millis(1), ());
133    ///
134    /// sleep(Duration::from_millis(2));
135    ///
136    /// let mut wakers = vec![];
137    ///
138    /// time_wheel.spin(&mut wakers);
139    ///
140    /// assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
141    /// ```
142    pub fn after(&mut self, duration: Duration, value: T) -> Option<u64> {
143        self.deadline(Instant::now() + duration, value)
144    }
145
146    /// Spin the wheel according to the current time and detect(returns) the expiry timers.
147    pub fn spin(&mut self, wakers: &mut Vec<(u64, T)>) {
148        self.ticks = (Instant::now() - self.start).as_micros() as u64 / self.tick_interval;
149
150        while let Some(slot) = self.priority_queue.peek() {
151            if slot.0 > self.ticks {
152                break;
153            }
154
155            let slot = self.priority_queue.pop().unwrap().0;
156
157            if let Some(timers) = self.timers.remove(&slot) {
158                self.counter -= timers.len();
159
160                let mut timers = timers.into_iter().map(|v| (slot, v)).collect();
161
162                wakers.append(&mut timers);
163            }
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::{thread::sleep, time::Duration};
171
172    use super::*;
173
174    #[test]
175    fn test_len() {
176        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
177
178        time_wheel
179            .deadline(time_wheel.start + Duration::from_millis(1), ())
180            .expect("deadline is valid.");
181
182        assert_eq!(time_wheel.len(), 1);
183
184        sleep(Duration::from_millis(1));
185
186        let mut wakers = vec![];
187
188        time_wheel.spin(&mut wakers);
189
190        assert_eq!(
191            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
192            vec![()]
193        );
194    }
195
196    #[test]
197    fn test_order() {
198        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
199
200        let deadline = time_wheel.start + Duration::from_millis(1);
201
202        time_wheel
203            .deadline(deadline, 1)
204            .expect("deadline is valid.");
205
206        time_wheel
207            .deadline(deadline, 2)
208            .expect("deadline is valid.");
209
210        sleep(Duration::from_millis(1));
211
212        let mut wakers = vec![];
213
214        time_wheel.spin(&mut wakers);
215
216        assert_eq!(
217            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
218            vec![1, 2]
219        );
220    }
221
222    #[test]
223    fn test_order2() {
224        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
225
226        time_wheel
227            .deadline(time_wheel.start + Duration::from_millis(500), 1)
228            .expect("deadline is valid.");
229
230        time_wheel
231            .deadline(time_wheel.start + Duration::from_millis(1000), 2)
232            .expect("deadline is valid.");
233
234        assert_eq!(time_wheel.len(), 2);
235
236        sleep(Duration::from_millis(500));
237
238        let mut wakers = vec![];
239
240        time_wheel.spin(&mut wakers);
241
242        assert_eq!(
243            wakers.iter().cloned().map(|v| v.1).collect::<Vec<_>>(),
244            vec![1]
245        );
246
247        assert_eq!(time_wheel.len(), 1);
248
249        sleep(Duration::from_millis(1000));
250
251        time_wheel.spin(&mut wakers);
252
253        assert_eq!(
254            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
255            vec![1, 2]
256        );
257
258        assert_eq!(time_wheel.len(), 0);
259    }
260
261    #[test]
262    fn test_after() {
263        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
264
265        time_wheel.after(Duration::from_millis(1), ());
266
267        sleep(Duration::from_millis(2));
268
269        let mut wakers = vec![];
270
271        time_wheel.spin(&mut wakers);
272
273        assert_eq!(
274            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
275            vec![()]
276        );
277    }
278}