timing_wheel/
lib.rs

1//! A binary-heap based timewheel alogrithem implemenation.
2
3use std::{
4    cmp::Reverse,
5    collections::{BinaryHeap, HashMap},
6    time::{Duration, Instant},
7};
8
9type Slot = Reverse<u64>;
10
11/// A priority queue(binary-heap) based timing wheel implementation.
12pub struct TimeWheel<T> {
13    /// The timewheel start timestamp.
14    start: Instant,
15    /// timewheel tick interval in secs.
16    tick_interval: u64,
17    /// ticks
18    ticks: u64,
19    /// slot queue.
20    priority_full_slot: BinaryHeap<Slot>,
21    /// timers.
22    timers: HashMap<u64, Vec<T>>,
23    /// alive timer counter.
24    counter: usize,
25}
26
27impl<T> TimeWheel<T> {
28    /// Create a time-wheel with `tick_interval`
29    pub fn new(tick_interval: Duration) -> Self {
30        Self {
31            tick_interval: tick_interval.as_micros() as u64,
32            ticks: 0,
33            start: Instant::now(),
34            priority_full_slot: Default::default(),
35            timers: Default::default(),
36            counter: 0,
37        }
38    }
39
40    /// Returns the number of alive timers.
41    pub fn len(&self) -> usize {
42        self.counter
43    }
44
45    /// Create a new timer using provided `deadline`.
46    ///
47    /// Return `None` if the deadline is already reach.
48    pub fn deadline(&mut self, deadline: Instant, value: T) -> Option<u64> {
49        let ticks = (deadline - self.start).as_micros() as u64 / self.tick_interval;
50
51        if !(ticks > self.ticks) {
52            return None;
53        }
54
55        if let Some(timers) = self.timers.get_mut(&ticks) {
56            timers.push(value);
57        } else {
58            self.timers.insert(ticks, vec![value]);
59            self.priority_full_slot.push(Reverse(ticks));
60        }
61
62        self.counter += 1;
63
64        Some(ticks)
65    }
66
67    //// Spin the wheel according to the current time
68    pub fn spin(&mut self) -> Vec<T> {
69        let to_slot = (Instant::now() - self.start).as_micros() as u64 / self.tick_interval;
70
71        let mut wakers = vec![];
72
73        while let Some(slot) = self.priority_full_slot.peek() {
74            if slot.0 > to_slot {
75                break;
76            }
77
78            let slot = self.priority_full_slot.pop().unwrap().0;
79
80            if let Some(mut timers) = self.timers.remove(&slot) {
81                self.counter -= timers.len();
82                wakers.append(&mut timers);
83            }
84        }
85
86        wakers
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use std::{thread::sleep, time::Duration};
93
94    use super::*;
95
96    #[test]
97    fn test_len() {
98        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
99
100        time_wheel
101            .deadline(time_wheel.start + Duration::from_millis(1), ())
102            .expect("deadline is valid.");
103
104        assert_eq!(time_wheel.len(), 1);
105
106        sleep(Duration::from_millis(1));
107
108        assert_eq!(time_wheel.spin(), vec![()]);
109    }
110
111    #[test]
112    fn test_order() {
113        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
114
115        let deadline = time_wheel.start + Duration::from_millis(1);
116
117        time_wheel
118            .deadline(deadline, 1)
119            .expect("deadline is valid.");
120
121        time_wheel
122            .deadline(deadline, 2)
123            .expect("deadline is valid.");
124
125        sleep(Duration::from_millis(1));
126
127        assert_eq!(time_wheel.spin(), vec![1, 2]);
128    }
129
130    #[test]
131    fn test_order2() {
132        let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
133
134        time_wheel
135            .deadline(time_wheel.start + Duration::from_millis(1), 1)
136            .expect("deadline is valid.");
137
138        time_wheel
139            .deadline(time_wheel.start + Duration::from_millis(2), 2)
140            .expect("deadline is valid.");
141
142        sleep(Duration::from_millis(1));
143
144        assert_eq!(time_wheel.spin(), vec![1]);
145
146        sleep(Duration::from_millis(1));
147
148        assert_eq!(time_wheel.spin(), vec![2]);
149    }
150}