timing_wheel/
lib.rs

1//! An improved hash time wheel algorithm based on binary heap priority queue.
2//!
3//! Unlike the classic [`Hashed and Hierarchical Timing Wheels`], this library uses a
4//! binary heap as a priority queue for timers.
5//! During [`spin`], it only needs to compare the timer ticks at the head of the
6//! heap to quickly detect expired timers.
7//!
8//! # Examples
9//!
10//! ```
11//! use std::time::{ Duration, Instant };
12//! use timing_wheel::TimeWheel;
13//! use std::thread::sleep;
14//!
15//! let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
16//!
17//! time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
18//!
19//! sleep(Duration::from_millis(1));
20//!
21//! let mut wakers = vec![];
22//!
23//! time_wheel.spin(&mut wakers);
24//!
25//! assert_eq!(wakers, vec![()]);
26//! ```
27//!
28//! [`spin`]: TimeWheel::spin
29//! [`Hashed and Hierarchical Timing Wheels`]: https://dl.acm.org/doi/pdf/10.1145/41457.37504
30
31use std::{
32    cmp::Reverse,
33    collections::{BinaryHeap, HashMap},
34    time::{Duration, Instant},
35};
36
37type Slot = Reverse<u64>;
38
39/// A binary-heap based timing wheel implementation.
40pub struct TimeWheel<T> {
41    /// The timewheel start timestamp.
42    start: Instant,
43    /// timewheel tick interval in secs.
44    tick_interval: u64,
45    /// ticks
46    ticks: u64,
47    /// slot queue.
48    priority_queue: BinaryHeap<Slot>,
49    /// timers.
50    timers: HashMap<u64, Vec<T>>,
51    /// alive timer counter.
52    counter: usize,
53}
54
55impl<T> TimeWheel<T> {
56    /// Create a time-wheel with minimum time interval resolution `tick_interval`.
57    pub fn new(tick_interval: Duration) -> Self {
58        Self {
59            tick_interval: tick_interval.as_micros() as u64,
60            ticks: 0,
61            start: Instant::now(),
62            priority_queue: Default::default(),
63            timers: Default::default(),
64            counter: 0,
65        }
66    }
67
68    /// Returns the number of alive timers.
69    pub fn len(&self) -> usize {
70        self.counter
71    }
72
73    /// Create a new timer using provided `deadline`.
74    ///
75    /// Return `None` if the deadline is already reach.
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use std::time::{ Duration, Instant };
81    /// use timing_wheel::TimeWheel;
82    /// use std::thread::sleep;
83    ///
84    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
85    ///
86    /// time_wheel.deadline(Instant::now() + Duration::from_millis(1), ());
87    ///
88    /// sleep(Duration::from_millis(1));
89    ///
90    /// let mut wakers = vec![];
91    ///
92    /// time_wheel.spin(&mut wakers);
93    ///
94    /// assert_eq!(wakers, vec![()]);
95    /// ```
96    pub fn deadline(&mut self, deadline: Instant, value: T) -> Option<u64> {
97        let ticks = (deadline - self.start).as_micros() as u64 / self.tick_interval;
98
99        if !(ticks > self.ticks) {
100            return None;
101        }
102
103        if let Some(timers) = self.timers.get_mut(&ticks) {
104            timers.push(value);
105        } else {
106            self.timers.insert(ticks, vec![value]);
107            self.priority_queue.push(Reverse(ticks));
108        }
109
110        self.counter += 1;
111
112        Some(ticks)
113    }
114
115    /// Create a new `deadline` with a value equal to `Instant::now() + duration`.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use std::time::Duration;
121    /// use timing_wheel::TimeWheel;
122    /// use std::thread::sleep;
123    ///
124    /// let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
125    ///
126    /// time_wheel.after(Duration::from_millis(1), ());
127    ///
128    /// sleep(Duration::from_millis(1));
129    ///
130    /// let mut wakers = vec![];
131    ///
132    /// time_wheel.spin(&mut wakers);
133    ///
134    /// assert_eq!(wakers, vec![()]);
135    /// ```
136    pub fn after(&mut self, duration: Duration, value: T) -> Option<u64> {
137        self.deadline(Instant::now() + duration, value)
138    }
139
140    //// Spin the wheel according to the current time
141    pub fn spin(&mut self, wakers: &mut Vec<T>) {
142        let to_slot = (Instant::now() - self.start).as_micros() as u64 / self.tick_interval;
143
144        while let Some(slot) = self.priority_queue.peek() {
145            if slot.0 > to_slot {
146                break;
147            }
148
149            let slot = self.priority_queue.pop().unwrap().0;
150
151            if let Some(mut timers) = self.timers.remove(&slot) {
152                self.counter -= timers.len();
153                wakers.append(&mut timers);
154            }
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use std::{thread::sleep, time::Duration};
162
163    use super::*;
164
165    #[test]
166    fn test_len() {
167        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
168
169        time_wheel
170            .deadline(time_wheel.start + Duration::from_millis(1), ())
171            .expect("deadline is valid.");
172
173        assert_eq!(time_wheel.len(), 1);
174
175        sleep(Duration::from_millis(1));
176
177        let mut wakers = vec![];
178
179        time_wheel.spin(&mut wakers);
180
181        assert_eq!(wakers, vec![()]);
182    }
183
184    #[test]
185    fn test_order() {
186        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
187
188        let deadline = time_wheel.start + Duration::from_millis(1);
189
190        time_wheel
191            .deadline(deadline, 1)
192            .expect("deadline is valid.");
193
194        time_wheel
195            .deadline(deadline, 2)
196            .expect("deadline is valid.");
197
198        sleep(Duration::from_millis(1));
199
200        let mut wakers = vec![];
201
202        time_wheel.spin(&mut wakers);
203
204        assert_eq!(wakers, vec![1, 2]);
205    }
206
207    #[test]
208    fn test_order2() {
209        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
210
211        time_wheel
212            .deadline(time_wheel.start + Duration::from_millis(1), 1)
213            .expect("deadline is valid.");
214
215        time_wheel
216            .deadline(time_wheel.start + Duration::from_millis(2), 2)
217            .expect("deadline is valid.");
218
219        assert_eq!(time_wheel.len(), 2);
220
221        sleep(Duration::from_millis(1));
222
223        let mut wakers = vec![];
224
225        time_wheel.spin(&mut wakers);
226
227        assert_eq!(wakers, vec![1]);
228
229        assert_eq!(time_wheel.len(), 1);
230
231        sleep(Duration::from_millis(1));
232
233        time_wheel.spin(&mut wakers);
234
235        assert_eq!(wakers, vec![1, 2]);
236
237        assert_eq!(time_wheel.len(), 0);
238    }
239
240    #[test]
241    fn test_after() {
242        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
243
244        time_wheel.after(Duration::from_millis(1), ());
245
246        sleep(Duration::from_millis(1));
247
248        let mut wakers = vec![];
249
250        time_wheel.spin(&mut wakers);
251
252        assert_eq!(wakers, vec![()]);
253    }
254}