timing_wheel/
lib.rs

1//! An improved hash time wheel algorithm based on binary heap priority queue.
2//!
3//! Unlike the classic [`Hashed and Hierarchical Timing Wheels`], this library uses a
4//! binary heap as a priority queue for timers.
5//! During [`spin`], it only needs to compare the timer ticks at the head of the
6//! heap to quickly detect expired timers.
7//!
8//! # Examples
9//!
10//! ```
11//! use std::time::{ Duration, Instant };
12//! use timing_wheel::TimeWheel;
13//! use std::thread::sleep;
14//!
15//! let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
16//!
17//! time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
18//!
19//! sleep(Duration::from_millis(1));
20//!
21//! let mut wakers = vec![];
22//!
23//! time_wheel.spin(&mut wakers);
24//!
25//! assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
26//! ```
27//!
28//! [`spin`]: TimeWheel::spin
29//! [`Hashed and Hierarchical Timing Wheels`]: https://dl.acm.org/doi/pdf/10.1145/41457.37504
30
31use std::{
32    cmp::Reverse,
33    collections::{BinaryHeap, HashMap},
34    time::{Duration, Instant},
35};
36
37type Slot = Reverse<u64>;
38
39/// A binary-heap based timing wheel implementation.
40pub struct TimeWheel<T> {
41    /// The timewheel start timestamp.
42    start: Instant,
43    /// timewheel tick interval in secs.
44    tick_interval: u64,
45    /// ticks
46    ticks: u64,
47    /// slot queue.
48    priority_queue: BinaryHeap<Slot>,
49    /// timers.
50    timers: HashMap<u64, Vec<T>>,
51    /// alive timer counter.
52    counter: usize,
53}
54
55impl<T> TimeWheel<T> {
56    /// Create a time-wheel with minimum time interval resolution `tick_interval`.
57    pub fn new(tick_interval: Duration) -> Self {
58        Self {
59            tick_interval: tick_interval.as_micros() as u64,
60            ticks: 0,
61            start: Instant::now(),
62            priority_queue: Default::default(),
63            timers: Default::default(),
64            counter: 0,
65        }
66    }
67
68    /// Returns the number of alive timers.
69    pub fn len(&self) -> usize {
70        self.counter
71    }
72
73    /// Create a new timer using provided `deadline`.
74    ///
75    /// Return `None` if the deadline is already reach.
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use std::time::{ Duration, Instant };
81    /// use timing_wheel::TimeWheel;
82    /// use std::thread::sleep;
83    ///
84    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
85    ///
86    /// time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
87    ///
88    /// sleep(Duration::from_millis(1));
89    ///
90    /// let mut wakers = vec![];
91    ///
92    /// time_wheel.spin(&mut wakers);
93    ///
94    /// assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
95    /// ```
96    pub fn deadline(&mut self, deadline: Instant, value: T) -> Option<u64> {
97        let ticks = (deadline - self.start).as_micros() as u64 / self.tick_interval;
98
99        if !(ticks > self.ticks) {
100            return None;
101        }
102
103        if let Some(timers) = self.timers.get_mut(&ticks) {
104            timers.push(value);
105        } else {
106            self.timers.insert(ticks, vec![value]);
107            self.priority_queue.push(Reverse(ticks));
108        }
109
110        self.counter += 1;
111
112        Some(ticks)
113    }
114
115    /// Create a new `deadline` with a value equal to `Instant::now() + duration`.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use std::time::Duration;
121    /// use timing_wheel::TimeWheel;
122    /// use std::thread::sleep;
123    ///
124    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
125    ///
126    /// time_wheel.after(Duration::from_millis(1), ());
127    ///
128    /// sleep(Duration::from_millis(1));
129    ///
130    /// let mut wakers = vec![];
131    ///
132    /// time_wheel.spin(&mut wakers);
133    ///
134    /// assert_eq!(wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(), vec![()]);
135    /// ```
136    pub fn after(&mut self, duration: Duration, value: T) -> Option<u64> {
137        self.deadline(Instant::now() + duration, value)
138    }
139
140    /// Spin the wheel according to the current time and detect(returns) the expiry timers.
141    pub fn spin(&mut self, wakers: &mut Vec<(u64, T)>) {
142        let to_slot = (Instant::now() - self.start).as_micros() as u64 / self.tick_interval;
143
144        while let Some(slot) = self.priority_queue.peek() {
145            if slot.0 > to_slot {
146                break;
147            }
148
149            let slot = self.priority_queue.pop().unwrap().0;
150
151            if let Some(timers) = self.timers.remove(&slot) {
152                self.counter -= timers.len();
153
154                let mut timers = timers.into_iter().map(|v| (slot, v)).collect();
155
156                wakers.append(&mut timers);
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::{thread::sleep, time::Duration};
165
166    use super::*;
167
168    #[test]
169    fn test_len() {
170        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
171
172        time_wheel
173            .deadline(time_wheel.start + Duration::from_millis(1), ())
174            .expect("deadline is valid.");
175
176        assert_eq!(time_wheel.len(), 1);
177
178        sleep(Duration::from_millis(1));
179
180        let mut wakers = vec![];
181
182        time_wheel.spin(&mut wakers);
183
184        assert_eq!(
185            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
186            vec![()]
187        );
188    }
189
190    #[test]
191    fn test_order() {
192        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
193
194        let deadline = time_wheel.start + Duration::from_millis(1);
195
196        time_wheel
197            .deadline(deadline, 1)
198            .expect("deadline is valid.");
199
200        time_wheel
201            .deadline(deadline, 2)
202            .expect("deadline is valid.");
203
204        sleep(Duration::from_millis(1));
205
206        let mut wakers = vec![];
207
208        time_wheel.spin(&mut wakers);
209
210        assert_eq!(
211            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
212            vec![1, 2]
213        );
214    }
215
216    #[test]
217    fn test_order2() {
218        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
219
220        time_wheel
221            .deadline(time_wheel.start + Duration::from_millis(1), 1)
222            .expect("deadline is valid.");
223
224        time_wheel
225            .deadline(time_wheel.start + Duration::from_millis(2), 2)
226            .expect("deadline is valid.");
227
228        assert_eq!(time_wheel.len(), 2);
229
230        sleep(Duration::from_millis(1));
231
232        let mut wakers = vec![];
233
234        time_wheel.spin(&mut wakers);
235
236        assert_eq!(
237            wakers.iter().cloned().map(|v| v.1).collect::<Vec<_>>(),
238            vec![1]
239        );
240
241        assert_eq!(time_wheel.len(), 1);
242
243        sleep(Duration::from_millis(1));
244
245        time_wheel.spin(&mut wakers);
246
247        assert_eq!(
248            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
249            vec![1, 2]
250        );
251
252        assert_eq!(time_wheel.len(), 0);
253    }
254
255    #[test]
256    fn test_after() {
257        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
258
259        time_wheel.after(Duration::from_millis(1), ());
260
261        sleep(Duration::from_millis(1));
262
263        let mut wakers = vec![];
264
265        time_wheel.spin(&mut wakers);
266
267        assert_eq!(
268            wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
269            vec![()]
270        );
271    }
272}