Skip to main content

tokio/runtime/time_alt/wheel/
mod.rs

1mod level;
2pub(crate) use self::level::Expiration;
3use self::level::Level;
4
5use super::cancellation_queue::Sender;
6use super::{EntryHandle, EntryList, WakeQueue};
7
8/// Hashed timing wheel implementation.
9///
10/// See [`Driver`] documentation for some implementation notes.
11///
12/// [`Driver`]: crate::runtime::time::Driver
13#[derive(Debug)]
14pub(crate) struct Wheel {
15    /// The number of milliseconds elapsed since the wheel started.
16    elapsed: u64,
17
18    /// Timer wheel.
19    ///
20    /// Levels:
21    ///
22    /// * 1 ms slots / 64 ms range
23    /// * 64 ms slots / ~ 4 sec range
24    /// * ~ 4 sec slots / ~ 4 min range
25    /// * ~ 4 min slots / ~ 4 hr range
26    /// * ~ 4 hr slots / ~ 12 day range
27    /// * ~ 12 day slots / ~ 2 yr range
28    levels: Box<[Level; NUM_LEVELS]>,
29}
30
31/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
32/// each, the timer is able to track time up to 2 years into the future with a
33/// precision of 1 millisecond.
34const NUM_LEVELS: usize = 6;
35
36/// The maximum duration of a `Sleep`.
37pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
38
39impl Wheel {
40    /// Creates a new timing wheel.
41    pub(crate) fn new() -> Wheel {
42        let mut levels = Vec::with_capacity(NUM_LEVELS);
43        for i in 0..NUM_LEVELS {
44            levels.push(Level::new(i));
45        }
46        Wheel {
47            elapsed: 0,
48            levels: levels.into_boxed_slice().try_into().unwrap(),
49        }
50    }
51
52    /// Returns the number of milliseconds that have elapsed since the timing
53    /// wheel's creation.
54    pub(crate) fn elapsed(&self) -> u64 {
55        self.elapsed
56    }
57
58    /// Inserts an entry into the timing wheel.
59    ///
60    /// # Arguments
61    ///
62    /// * `hdl`: The entry handle to insert into the wheel.
63    ///
64    /// # Safety
65    ///
66    /// The caller must ensure:
67    ///
68    /// * The entry is not already registered in ANY wheel.
69    pub(crate) unsafe fn insert(&mut self, hdl: EntryHandle, cancel_tx: Sender) {
70        let deadline = hdl.deadline();
71
72        assert!(deadline > self.elapsed);
73
74        hdl.register_cancel_tx(cancel_tx);
75
76        // Get the level at which the entry should be stored
77        let level = self.level_for(deadline);
78        unsafe {
79            self.levels[level].add_entry(hdl);
80        }
81
82        debug_assert!({
83            self.levels[level]
84                .next_expiration(self.elapsed)
85                .map(|e| e.deadline >= self.elapsed)
86                .unwrap_or(true)
87        });
88    }
89
90    /// Removes `item` from the timing wheel.
91    ///
92    /// # Safety
93    ///
94    /// The caller must ensure:
95    ///
96    /// * The entry is already registered in THIS wheel.
97    pub(crate) unsafe fn remove(&mut self, hdl: EntryHandle) {
98        let deadline = hdl.deadline();
99        debug_assert!(
100            self.elapsed <= deadline,
101            "elapsed={}; deadline={}",
102            self.elapsed,
103            deadline
104        );
105
106        let level = self.level_for(deadline);
107        unsafe { self.levels[level].remove_entry(hdl.clone()) };
108    }
109
110    /// Advances the timer up to the instant represented by `now`.
111    pub(crate) fn take_expired(&mut self, now: u64, wake_queue: &mut WakeQueue) {
112        while let Some(expiration) = self
113            .next_expiration()
114            .filter(|expiration| expiration.deadline <= now)
115        {
116            self.process_expiration(&expiration, wake_queue);
117
118            self.set_elapsed(expiration.deadline);
119        }
120        self.set_elapsed(now);
121    }
122
123    /// Returns the instant at which the next timeout expires.
124    fn next_expiration(&self) -> Option<Expiration> {
125        // Check all levels
126        self.levels
127            .iter()
128            .enumerate()
129            .find_map(|(level_num, level)| {
130                let expiration = level.next_expiration(self.elapsed)?;
131                // There cannot be any expirations at a higher level that happen
132                // before this one.
133                debug_assert!(self.no_expirations_before(level_num + 1, expiration.deadline));
134
135                Some(expiration)
136            })
137    }
138
139    /// Returns the tick at which this timer wheel next needs to perform some
140    /// processing, or None if there are no timers registered.
141    pub(crate) fn next_expiration_time(&self) -> Option<u64> {
142        self.next_expiration().map(|ex| ex.deadline)
143    }
144
145    /// Used for debug assertions
146    fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
147        self.levels[start_level..]
148            .iter()
149            .flat_map(|level| level.next_expiration(self.elapsed))
150            .all(|e2| before <= e2.deadline)
151    }
152
153    /// iteratively find entries that are between the wheel's current
154    /// time and the expiration time.  for each in that population either
155    /// queue it for notification (in the case of the last level) or tier
156    /// it down to the next level (in all other cases).
157    pub(crate) fn process_expiration(
158        &mut self,
159        expiration: &Expiration,
160        wake_queue: &mut WakeQueue,
161    ) {
162        // Note that we need to take _all_ of the entries off the list before
163        // processing any of them. This is important because it's possible that
164        // those entries might need to be reinserted into the same slot.
165        //
166        // This happens only on the highest level, when an entry is inserted
167        // more than MAX_DURATION into the future. When this happens, we wrap
168        // around, and process some entries a multiple of MAX_DURATION before
169        // they actually need to be dropped down a level. We then reinsert them
170        // back into the same position; we must make sure we don't then process
171        // those entries again or we'll end up in an infinite loop.
172        let mut entries = self.take_entries(expiration);
173
174        while let Some(hdl) = entries.pop_back() {
175            if expiration.level == 0 {
176                debug_assert_eq!(hdl.deadline(), expiration.deadline);
177            }
178
179            let deadline = hdl.deadline();
180
181            if deadline > expiration.deadline {
182                let level = level_for(expiration.deadline, deadline);
183                unsafe {
184                    self.levels[level].add_entry(hdl);
185                }
186            } else {
187                unsafe {
188                    wake_queue.push_front(hdl);
189                }
190            }
191        }
192    }
193
194    fn set_elapsed(&mut self, when: u64) {
195        assert!(
196            self.elapsed <= when,
197            "elapsed={:?}; when={:?}",
198            self.elapsed,
199            when
200        );
201
202        if when > self.elapsed {
203            self.elapsed = when;
204        }
205    }
206
207    /// Obtains the list of entries that need processing for the given expiration.
208    fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
209        self.levels[expiration.level].take_slot(expiration.slot)
210    }
211
212    fn level_for(&self, when: u64) -> usize {
213        level_for(self.elapsed, when)
214    }
215}
216
217fn level_for(elapsed: u64, when: u64) -> usize {
218    const SLOT_MASK: u64 = (1 << 6) - 1;
219
220    // Mask in the trailing bits ignored by the level calculation in order to cap
221    // the possible leading zeros
222    let mut masked = elapsed ^ when | SLOT_MASK;
223
224    if masked >= MAX_DURATION {
225        // Fudge the timer into the top level
226        masked = MAX_DURATION - 1;
227    }
228
229    let leading_zeros = masked.leading_zeros() as usize;
230    let significant = 63 - leading_zeros;
231
232    significant / NUM_LEVELS
233}
234
235#[cfg(all(test, not(loom)))]
236mod test {
237    use super::*;
238
239    #[test]
240    fn test_level_for() {
241        for pos in 0..64 {
242            assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
243        }
244
245        for level in 1..5 {
246            for pos in level..64 {
247                let a = pos * 64_usize.pow(level as u32);
248                assert_eq!(
249                    level,
250                    level_for(0, a as u64),
251                    "level_for({a}) -- binary = {a:b}"
252                );
253
254                if pos > level {
255                    let a = a - 1;
256                    assert_eq!(
257                        level,
258                        level_for(0, a as u64),
259                        "level_for({a}) -- binary = {a:b}"
260                    );
261                }
262
263                if pos < 64 {
264                    let a = a + 1;
265                    assert_eq!(
266                        level,
267                        level_for(0, a as u64),
268                        "level_for({a}) -- binary = {a:b}"
269                    );
270                }
271            }
272        }
273    }
274}