reactors/
timewheel.rs

1//! Hashed [`timewheel`](https://blog.acolyer.org/2015/11/23/hashed-and-hierarchical-timing-wheels/)
2//! implementation for [`IoReactor`](crate::io::IoReactor)
3
4use std::collections::HashMap;
5use std::task::Poll;
6// Time wheel algorithem impl
7#[derive(Debug)]
8struct Slot<T> {
9    round: u64,
10    t: T,
11}
12
13/// Timewheel implementation for [`IoReactor`](crate::io::IoReactor)
14#[derive(Debug)]
15pub struct TimeWheel<T> {
16    hashed: HashMap<u64, Vec<Slot<T>>>,
17    steps: u64,
18    tick: u64,
19}
20
21impl<T> TimeWheel<T> {
22    // create new hashed time wheel instance
23    pub fn new(steps: u64) -> Self {
24        TimeWheel {
25            steps: steps,
26            hashed: HashMap::new(),
27            tick: 0,
28        }
29    }
30
31    pub fn add(&mut self, timeout: u64, value: T) {
32        log::trace!(
33            "add timeout({}) steps({}) tick({})",
34            timeout,
35            self.steps,
36            self.tick
37        );
38
39        let slot = (timeout + self.tick) % self.steps;
40        let round = timeout / self.steps;
41
42        log::trace!(
43            "add timeout({}) to slot({}) with round({}), current tick is {}",
44            timeout,
45            slot,
46            round,
47            self.tick
48        );
49
50        let slots = self.hashed.entry(slot).or_insert(Vec::new());
51
52        slots.push(Slot { t: value, round });
53    }
54
55    pub fn tick(&mut self) -> Poll<Vec<T>> {
56        let step = self.tick % self.steps;
57
58        self.tick += 1;
59
60        if let Some(slots) = self.hashed.remove(&step) {
61            let mut current: Vec<T> = vec![];
62            let mut reserved: Vec<Slot<T>> = vec![];
63
64            for slot in slots {
65                if slot.round == 0 {
66                    current.push(slot.t);
67                } else {
68                    reserved.push(Slot::<T> {
69                        t: slot.t,
70                        round: slot.round - 1,
71                    });
72                }
73            }
74
75            if !reserved.is_empty() {
76                self.hashed.insert(step, reserved);
77            }
78
79            if !current.is_empty() {
80                return Poll::Ready(current);
81            }
82        }
83
84        Poll::Pending
85    }
86}