sozu_lib/
timer.rs

1//! Timer based on timing wheels
2//!
3//! code imported from mio-extras
4//! License: MIT or Apache 2.0
5use std::{
6    cmp,
7    fmt::Display,
8    iter,
9    time::{Duration, Instant},
10};
11
12use mio::Token;
13use slab::Slab;
14
15use crate::server::TIMER;
16
17// Conversion utilities
18mod convert {
19    use std::time::Duration;
20
21    /// Convert a `Duration` to milliseconds, rounding up and saturating at
22    /// `u64::MAX`.
23    ///
24    /// The saturating is fine because `u64::MAX` milliseconds are still many
25    /// million years.
26    pub fn millis(duration: Duration) -> u64 {
27        u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
28    }
29}
30
31/// A timer.
32///
33/// Typical usage goes like this:
34///
35/// * register the timer with a `mio::Poll`.
36/// * set a timeout, by calling `Timer::set_timeout`.  Here you provide some
37///   state to be associated with this timeout.
38/// * poll the `Poll`, to learn when a timeout has occurred.
39/// * retrieve state associated with the timeout by calling `Timer::poll`.
40///
41/// You can omit use of the `Poll` altogether, if you like, and just poll the
42/// `Timer` directly.
43pub struct Timer<T> {
44    // Size of each tick in milliseconds
45    tick_ms: u64,
46    // Slab of timeout entries
47    entries: Slab<Entry<T>>,
48    // Timeout wheel. Each tick, the timer will look at the next slot for
49    // timeouts that match the current tick.
50    wheel: Vec<WheelEntry>,
51    // Tick 0's time instant
52    start: Instant,
53    // The current tick
54    tick: Tick,
55    // The next entry to possibly timeout
56    next: Token,
57    // Masks the target tick to get the slot
58    mask: u64,
59}
60
61/// Used to create a `Timer`.
62pub struct Builder {
63    // Approximate duration of each tick
64    tick: Duration,
65    // Number of slots in the timer wheel
66    num_slots: usize,
67    // Max number of timeouts that can be in flight at a given time.
68    capacity: usize,
69}
70
71/// A timeout, as returned by `Timer::set_timeout`.
72///
73/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout.
74#[derive(Clone, Debug)]
75pub struct Timeout {
76    // Reference into the timer entry slab
77    token: Token,
78    // Tick that it should match up with
79    tick: u64,
80}
81
82#[derive(Clone, Debug)]
83pub struct TimeoutContainer {
84    // mark it as an option, so we do not try to cancel a timeout multiple times
85    timeout: Option<Timeout>,
86    duration: Duration,
87    token: Option<Token>,
88}
89
90impl TimeoutContainer {
91    pub fn new(duration: Duration, token: Token) -> TimeoutContainer {
92        let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(duration, token));
93        TimeoutContainer {
94            timeout: Some(timeout),
95            duration,
96            token: Some(token),
97        }
98    }
99
100    pub fn new_empty(duration: Duration) -> TimeoutContainer {
101        TimeoutContainer {
102            timeout: None,
103            duration,
104            token: None,
105        }
106    }
107
108    pub fn take(&mut self) -> TimeoutContainer {
109        TimeoutContainer {
110            timeout: self.timeout.take(),
111            duration: self.duration,
112            token: self.token.take(),
113        }
114    }
115
116    /// must be called when a timeout was triggered, to prevent errors when canceling
117    pub fn triggered(&mut self) {
118        let _ = self.timeout.take();
119    }
120
121    pub fn set(&mut self, token: Token) {
122        if let Some(timeout) = self.timeout.take() {
123            TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
124        }
125
126        let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token));
127
128        self.timeout = Some(timeout);
129        self.token = Some(token);
130    }
131
132    /// warning: this does not reset the timer
133    pub fn set_duration(&mut self, duration: Duration) {
134        self.duration = duration;
135
136        if let Some(timeout) = self.timeout.take() {
137            TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
138        }
139
140        if let Some(token) = self.token {
141            self.timeout =
142                Some(TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)));
143        }
144    }
145
146    pub fn duration(&self) -> Duration {
147        self.duration
148    }
149
150    pub fn cancel(&mut self) -> bool {
151        match self.timeout.take() {
152            None => {
153                //error!("cannot cancel non existing timeout");
154                //error!("self.duration was {:?}", self.duration);
155                false
156            }
157            Some(timeout) => {
158                TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
159                true
160            }
161        }
162    }
163
164    // Reset the timeout to its optional timeout, or to its defined duration
165    pub fn reset(&mut self) -> bool {
166        match self.timeout.take() {
167            None => {
168                if let Some(token) = self.token {
169                    self.timeout = Some(
170                        TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)),
171                    );
172                } else {
173                    //error!("cannot reset non existing timeout");
174                    return false;
175                }
176            }
177            Some(timeout) => {
178                self.timeout =
179                    TIMER.with(|timer| timer.borrow_mut().reset_timeout(&timeout, self.duration));
180            }
181        };
182        self.timeout.is_some()
183    }
184}
185
186impl Display for TimeoutContainer {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        write!(f, "{:?}", self.duration)
189    }
190}
191
192impl std::ops::Drop for TimeoutContainer {
193    fn drop(&mut self) {
194        if self.cancel() {
195            debug!(
196                "Cancel a dangling timeout that haven't be handled in session lifecycle, token ({:?}), duration {}",
197                self.token, self
198            );
199        }
200    }
201}
202
203#[derive(Copy, Clone, Debug)]
204struct WheelEntry {
205    next_tick: Tick,
206    head: Token,
207}
208
209// Doubly linked list of timer entries. Allows for efficient insertion /
210// removal of timeouts.
211struct Entry<T> {
212    state: T,
213    links: EntryLinks,
214}
215
216#[derive(Copy, Clone)]
217struct EntryLinks {
218    tick: Tick,
219    prev: Token,
220    next: Token,
221}
222
223type Tick = u64;
224const TICK_MAX: Tick = u64::MAX;
225const EMPTY: Token = Token(usize::MAX);
226
227impl Builder {
228    /// Set the tick duration.  Default is 100ms.
229    pub fn tick_duration(mut self, duration: Duration) -> Builder {
230        self.tick = duration;
231        self
232    }
233
234    /// Set the number of slots.  Default is 256.
235    pub fn num_slots(mut self, num_slots: usize) -> Builder {
236        self.num_slots = num_slots;
237        self
238    }
239
240    /// Set the capacity.  Default is 65536.
241    pub fn capacity(mut self, capacity: usize) -> Builder {
242        self.capacity = capacity;
243        self
244    }
245
246    /// Build a `Timer` with the parameters set on this `Builder`.
247    pub fn build<T>(self) -> Timer<T> {
248        Timer::new(
249            convert::millis(self.tick),
250            self.num_slots,
251            self.capacity,
252            Instant::now(),
253        )
254    }
255}
256
257impl Default for Builder {
258    fn default() -> Builder {
259        Builder {
260            tick: Duration::from_millis(100),
261            num_slots: 1 << 8,
262            capacity: 1 << 16,
263        }
264    }
265}
266
267impl<T> Timer<T> {
268    fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
269        let num_slots = num_slots.next_power_of_two();
270        let capacity = capacity.next_power_of_two();
271        let mask = (num_slots as u64) - 1;
272        let wheel = iter::repeat(WheelEntry {
273            next_tick: TICK_MAX,
274            head: EMPTY,
275        })
276        .take(num_slots)
277        .collect();
278
279        Timer {
280            tick_ms,
281            entries: Slab::with_capacity(capacity),
282            wheel,
283            start,
284            tick: 0,
285            next: EMPTY,
286            mask,
287        }
288    }
289
290    /// Set a timeout.
291    ///
292    /// When the timeout occurs, the given state becomes available via `poll`.
293    pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
294        let delay_from_start = self.start.elapsed() + delay_from_now;
295        self.set_timeout_at(delay_from_start, state)
296    }
297
298    fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
299        let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
300        trace!(
301            "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
302            delay_from_start, tick, self.tick
303        );
304
305        // Always target at least 1 tick in the future
306        if tick <= self.tick {
307            tick = self.tick + 1;
308        }
309
310        self.insert(tick, state)
311    }
312
313    fn insert(&mut self, tick: Tick, state: T) -> Timeout {
314        // Get the slot for the requested tick
315        let slot = (tick & self.mask) as usize;
316        let curr = self.wheel[slot];
317
318        // Insert the new entry
319        let entry = Entry::new(state, tick, curr.head);
320        let token = Token(self.entries.insert(entry));
321
322        if curr.head != EMPTY {
323            // If there was a previous entry, set its prev pointer to the new
324            // entry
325            self.entries[curr.head.into()].links.prev = token;
326        }
327
328        // Update the head slot
329        self.wheel[slot] = WheelEntry {
330            next_tick: cmp::min(tick, curr.next_tick),
331            head: token,
332        };
333
334        trace!("inserted timeout; slot={}; token={:?}", slot, token);
335
336        // Return the new timeout
337        Timeout { token, tick }
338    }
339
340    /// Resets a timeout.
341    ///
342    pub fn reset_timeout(
343        &mut self,
344        timeout: &Timeout,
345        delay_from_now: Duration,
346    ) -> Option<Timeout> {
347        self.cancel_timeout(timeout)
348            .map(|state| self.set_timeout(delay_from_now, state))
349    }
350
351    // TODO: return Result with context
352    /// Cancel a timeout.
353    ///
354    /// If the timeout has not yet occurred, the return value holds the
355    /// associated state.
356    pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
357        let links = match self.entries.get(timeout.token.into()) {
358            Some(e) => e.links,
359            None => {
360                debug!("timeout token {:?} not found", timeout.token);
361                return None;
362            }
363        };
364
365        // Sanity check
366        if links.tick != timeout.tick {
367            return None;
368        }
369
370        self.unlink(&links, timeout.token);
371        Some(self.entries.remove(timeout.token.into()).state)
372    }
373
374    /// Poll for an expired timer.
375    ///
376    /// The return value holds the state associated with the first expired
377    /// timer, if any.
378    pub fn poll(&mut self) -> Option<T> {
379        let target_tick = current_tick(self.start, self.tick_ms);
380        self.poll_to(target_tick)
381    }
382
383    fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
384        trace!(
385            "tick_to; target_tick={}; current_tick={}",
386            target_tick, self.tick
387        );
388
389        if target_tick < self.tick {
390            target_tick = self.tick;
391        }
392
393        while self.tick <= target_tick {
394            let curr = self.next;
395
396            //info!("ticking; curr={:?}", curr);
397
398            if curr == EMPTY {
399                self.tick += 1;
400
401                let slot = self.slot_for(self.tick);
402                self.next = self.wheel[slot].head;
403
404                // Handle the case when a slot has a single timeout which gets
405                // canceled before the timeout expires. In this case, the
406                // slot's head is EMPTY but there is a value for next_tick. Not
407                // resetting next_tick here causes the timer to get stuck in a
408                // loop.
409                if self.next == EMPTY {
410                    self.wheel[slot].next_tick = TICK_MAX;
411                }
412            } else {
413                let slot = self.slot_for(self.tick);
414
415                if curr == self.wheel[slot].head {
416                    self.wheel[slot].next_tick = TICK_MAX;
417                }
418
419                let links = self.entries[curr.into()].links;
420
421                if links.tick <= self.tick {
422                    trace!("triggering; token={:?}", curr);
423
424                    // Unlink will also advance self.next
425                    self.unlink(&links, curr);
426
427                    // Remove and return the token
428                    return Some(self.entries.remove(curr.into()).state);
429                } else {
430                    let next_tick = self.wheel[slot].next_tick;
431                    self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
432                    self.next = links.next;
433                }
434            }
435        }
436
437        None
438    }
439
440    fn unlink(&mut self, links: &EntryLinks, token: Token) {
441        trace!(
442            "unlinking timeout; slot={}; token={:?}",
443            self.slot_for(links.tick),
444            token
445        );
446
447        if links.prev == EMPTY {
448            let slot = self.slot_for(links.tick);
449            self.wheel[slot].head = links.next;
450        } else {
451            self.entries[links.prev.into()].links.next = links.next;
452        }
453
454        if links.next != EMPTY {
455            self.entries[links.next.into()].links.prev = links.prev;
456
457            if token == self.next {
458                self.next = links.next;
459            }
460        } else if token == self.next {
461            self.next = EMPTY;
462        }
463    }
464
465    // Next tick containing a timeout
466    fn next_tick(&self) -> Option<Tick> {
467        if self.next != EMPTY {
468            let slot = self.slot_for(self.entries[self.next.into()].links.tick);
469
470            if self.wheel[slot].next_tick == self.tick {
471                // There is data ready right now
472                return Some(self.tick);
473            }
474        }
475
476        self.wheel.iter().map(|e| e.next_tick).min()
477    }
478
479    pub fn next_poll_date(&self) -> Option<Instant> {
480        self.next_tick()
481            .map(|tick| self.start + Duration::from_millis(self.tick_ms.saturating_mul(tick)))
482    }
483
484    fn slot_for(&self, tick: Tick) -> usize {
485        (self.mask & tick) as usize
486    }
487}
488
489impl<T> Default for Timer<T> {
490    fn default() -> Timer<T> {
491        Builder::default().build()
492    }
493}
494
495fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
496    // Calculate tick rounding up to the closest one
497    let elapsed_ms = convert::millis(elapsed);
498    elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
499}
500
501fn current_tick(start: Instant, tick_ms: u64) -> Tick {
502    duration_to_tick(start.elapsed(), tick_ms)
503}
504
505impl<T> Entry<T> {
506    fn new(state: T, tick: u64, next: Token) -> Entry<T> {
507        Entry {
508            state,
509            links: EntryLinks {
510                tick,
511                prev: EMPTY,
512                next,
513            },
514        }
515    }
516}
517
518#[cfg(test)]
519mod test {
520    use std::time::{Duration, Instant};
521
522    use super::*;
523
524    #[test]
525    pub fn test_timeout_next_tick() {
526        let mut t = timer();
527
528        t.set_timeout_at(Duration::from_millis(100), "a");
529
530        let mut tick = ms_to_tick(&t, 50);
531        assert_eq!(None, t.poll_to(tick));
532
533        tick = ms_to_tick(&t, 100);
534        assert_eq!(Some("a"), t.poll_to(tick));
535        assert_eq!(None, t.poll_to(tick));
536
537        tick = ms_to_tick(&t, 150);
538        assert_eq!(None, t.poll_to(tick));
539
540        tick = ms_to_tick(&t, 200);
541        assert_eq!(None, t.poll_to(tick));
542
543        assert_eq!(count(&t), 0);
544    }
545
546    #[test]
547    pub fn test_clearing_timeout() {
548        let mut t = timer();
549
550        let to = t.set_timeout_at(Duration::from_millis(100), "a");
551        assert_eq!("a", t.cancel_timeout(&to).unwrap());
552
553        let mut tick = ms_to_tick(&t, 100);
554        assert_eq!(None, t.poll_to(tick));
555
556        tick = ms_to_tick(&t, 200);
557        assert_eq!(None, t.poll_to(tick));
558
559        assert_eq!(count(&t), 0);
560    }
561
562    #[test]
563    pub fn test_multiple_timeouts_same_tick() {
564        let mut t = timer();
565
566        t.set_timeout_at(Duration::from_millis(100), "a");
567        t.set_timeout_at(Duration::from_millis(100), "b");
568
569        let mut rcv = vec![];
570
571        let mut tick = ms_to_tick(&t, 100);
572        rcv.push(t.poll_to(tick).unwrap());
573        rcv.push(t.poll_to(tick).unwrap());
574
575        assert_eq!(None, t.poll_to(tick));
576
577        rcv.sort_unstable();
578        assert!(rcv == ["a", "b"], "actual={rcv:?}");
579
580        tick = ms_to_tick(&t, 200);
581        assert_eq!(None, t.poll_to(tick));
582
583        assert_eq!(count(&t), 0);
584    }
585
586    #[test]
587    pub fn test_multiple_timeouts_diff_tick() {
588        let mut t = timer();
589
590        t.set_timeout_at(Duration::from_millis(110), "a");
591        t.set_timeout_at(Duration::from_millis(220), "b");
592        t.set_timeout_at(Duration::from_millis(230), "c");
593        t.set_timeout_at(Duration::from_millis(440), "d");
594        t.set_timeout_at(Duration::from_millis(560), "e");
595
596        let mut tick = ms_to_tick(&t, 100);
597        assert_eq!(Some("a"), t.poll_to(tick));
598        assert_eq!(None, t.poll_to(tick));
599
600        tick = ms_to_tick(&t, 200);
601        assert_eq!(Some("c"), t.poll_to(tick));
602        assert_eq!(Some("b"), t.poll_to(tick));
603        assert_eq!(None, t.poll_to(tick));
604
605        tick = ms_to_tick(&t, 300);
606        assert_eq!(None, t.poll_to(tick));
607
608        tick = ms_to_tick(&t, 400);
609        assert_eq!(Some("d"), t.poll_to(tick));
610        assert_eq!(None, t.poll_to(tick));
611
612        tick = ms_to_tick(&t, 500);
613        assert_eq!(None, t.poll_to(tick));
614
615        tick = ms_to_tick(&t, 600);
616        assert_eq!(Some("e"), t.poll_to(tick));
617        assert_eq!(None, t.poll_to(tick));
618    }
619
620    #[test]
621    pub fn test_catching_up() {
622        let mut t = timer();
623
624        t.set_timeout_at(Duration::from_millis(110), "a");
625        t.set_timeout_at(Duration::from_millis(220), "b");
626        t.set_timeout_at(Duration::from_millis(230), "c");
627        t.set_timeout_at(Duration::from_millis(440), "d");
628
629        let tick = ms_to_tick(&t, 600);
630        assert_eq!(Some("a"), t.poll_to(tick));
631        assert_eq!(Some("c"), t.poll_to(tick));
632        assert_eq!(Some("b"), t.poll_to(tick));
633        assert_eq!(Some("d"), t.poll_to(tick));
634        assert_eq!(None, t.poll_to(tick));
635    }
636
637    #[test]
638    pub fn test_timeout_hash_collision() {
639        let mut t = timer();
640
641        t.set_timeout_at(Duration::from_millis(100), "a");
642        t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
643
644        let mut tick = ms_to_tick(&t, 100);
645        assert_eq!(Some("a"), t.poll_to(tick));
646        assert_eq!(1, count(&t));
647
648        tick = ms_to_tick(&t, 200);
649        assert_eq!(None, t.poll_to(tick));
650        assert_eq!(1, count(&t));
651
652        tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
653        assert_eq!(Some("b"), t.poll_to(tick));
654        assert_eq!(0, count(&t));
655    }
656
657    #[test]
658    pub fn test_clearing_timeout_between_triggers() {
659        let mut t = timer();
660
661        let a = t.set_timeout_at(Duration::from_millis(100), "a");
662        let _ = t.set_timeout_at(Duration::from_millis(100), "b");
663        let _ = t.set_timeout_at(Duration::from_millis(200), "c");
664
665        let mut tick = ms_to_tick(&t, 100);
666        assert_eq!(Some("b"), t.poll_to(tick));
667        assert_eq!(2, count(&t));
668
669        t.cancel_timeout(&a);
670        assert_eq!(1, count(&t));
671
672        assert_eq!(None, t.poll_to(tick));
673
674        tick = ms_to_tick(&t, 200);
675        assert_eq!(Some("c"), t.poll_to(tick));
676        assert_eq!(0, count(&t));
677    }
678
679    const TICK: u64 = 100;
680    const SLOTS: usize = 16;
681    const CAPACITY: usize = 32;
682
683    fn count<T>(timer: &Timer<T>) -> usize {
684        timer.entries.len()
685    }
686
687    fn timer() -> Timer<&'static str> {
688        Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
689    }
690
691    fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
692        ms / timer.tick_ms
693    }
694}