retty_io/
timer.rs

1//! Timer optimized for I/O related operations
2
3#![allow(deprecated, missing_debug_implementations)]
4
5use crate::{convert, Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
6use lazycell::LazyCell;
7use slab::Slab;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use std::{cmp, fmt, io, iter, thread, u64, usize};
12
13/// A timer.
14///
15/// Typical usage goes like this:
16///
17/// * register the timer with a `Poll`.
18/// * set a timeout, by calling `Timer::set_timeout`.  Here you provide some
19///   state to be associated with this timeout.
20/// * poll the `Poll`, to learn when a timeout has occurred.
21/// * retrieve state associated with the timeout by calling `Timer::poll`.
22///
23/// You can omit use of the `Poll` altogether, if you like, and just poll the
24/// `Timer` directly.
25pub struct Timer<T> {
26    // Size of each tick in milliseconds
27    tick_ms: u64,
28    // Slab of timeout entries
29    entries: Slab<Entry<T>>,
30    // Timeout wheel. Each tick, the timer will look at the next slot for
31    // timeouts that match the current tick.
32    wheel: Vec<WheelEntry>,
33    // Tick 0's time instant
34    start: Instant,
35    // The current tick
36    tick: Tick,
37    // The next entry to possibly timeout
38    next: Token,
39    // Masks the target tick to get the slot
40    mask: u64,
41    // Set on registration with Poll
42    inner: LazyCell<Inner>,
43}
44
45/// Used to create a `Timer`.
46pub struct Builder {
47    // Approximate duration of each tick
48    tick: Duration,
49    // Number of slots in the timer wheel
50    num_slots: usize,
51    // Max number of timeouts that can be in flight at a given time.
52    capacity: usize,
53}
54
55/// A timeout, as returned by `Timer::set_timeout`.
56///
57/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout.
58#[derive(Clone, Debug)]
59pub struct Timeout {
60    // Reference into the timer entry slab
61    token: Token,
62    // Tick that it should match up with
63    tick: u64,
64}
65
66struct Inner {
67    registration: Registration,
68    set_readiness: SetReadiness,
69    wakeup_state: WakeupState,
70    wakeup_thread: thread::JoinHandle<()>,
71}
72
73impl Drop for Inner {
74    fn drop(&mut self) {
75        // 1. Set wakeup state to TERMINATE_THREAD
76        self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release);
77        // 2. Wake him up
78        self.wakeup_thread.thread().unpark();
79    }
80}
81
82#[derive(Copy, Clone, Debug)]
83struct WheelEntry {
84    next_tick: Tick,
85    head: Token,
86}
87
88// Doubly linked list of timer entries. Allows for efficient insertion /
89// removal of timeouts.
90struct Entry<T> {
91    state: T,
92    links: EntryLinks,
93}
94
95#[derive(Copy, Clone)]
96struct EntryLinks {
97    tick: Tick,
98    prev: Token,
99    next: Token,
100}
101
102type Tick = u64;
103
104const TICK_MAX: Tick = u64::MAX;
105
106// Manages communication with wakeup thread
107type WakeupState = Arc<AtomicUsize>;
108
109const TERMINATE_THREAD: usize = 0;
110const EMPTY: Token = Token(usize::MAX);
111
112impl Builder {
113    /// Set the tick duration.  Default is 100ms.
114    pub fn tick_duration(mut self, duration: Duration) -> Builder {
115        self.tick = duration;
116        self
117    }
118
119    /// Set the number of slots.  Default is 256.
120    pub fn num_slots(mut self, num_slots: usize) -> Builder {
121        self.num_slots = num_slots;
122        self
123    }
124
125    /// Set the capacity.  Default is 65536.
126    pub fn capacity(mut self, capacity: usize) -> Builder {
127        self.capacity = capacity;
128        self
129    }
130
131    /// Build a `Timer` with the parameters set on this `Builder`.
132    pub fn build<T>(self) -> Timer<T> {
133        Timer::new(
134            convert::millis(self.tick),
135            self.num_slots,
136            self.capacity,
137            Instant::now(),
138        )
139    }
140}
141
142impl Default for Builder {
143    fn default() -> Builder {
144        Builder {
145            tick: Duration::from_millis(100),
146            num_slots: 1 << 8,
147            capacity: 1 << 16,
148        }
149    }
150}
151
152impl<T> Timer<T> {
153    fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
154        let num_slots = num_slots.next_power_of_two();
155        let capacity = capacity.next_power_of_two();
156        let mask = (num_slots as u64) - 1;
157        let wheel = iter::repeat(WheelEntry {
158            next_tick: TICK_MAX,
159            head: EMPTY,
160        })
161        .take(num_slots)
162        .collect();
163
164        Timer {
165            tick_ms,
166            entries: Slab::with_capacity(capacity),
167            wheel,
168            start,
169            tick: 0,
170            next: EMPTY,
171            mask,
172            inner: LazyCell::new(),
173        }
174    }
175
176    /// Set a timeout.
177    ///
178    /// When the timeout occurs, the given state becomes available via `poll`.
179    pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
180        let delay_from_start = self.start.elapsed() + delay_from_now;
181        self.set_timeout_at(delay_from_start, state)
182    }
183
184    fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
185        let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
186        trace!(
187            "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
188            delay_from_start,
189            tick,
190            self.tick
191        );
192
193        // Always target at least 1 tick in the future
194        if tick <= self.tick {
195            tick = self.tick + 1;
196        }
197
198        self.insert(tick, state)
199    }
200
201    fn insert(&mut self, tick: Tick, state: T) -> Timeout {
202        // Get the slot for the requested tick
203        let slot = (tick & self.mask) as usize;
204        let curr = self.wheel[slot];
205
206        // Insert the new entry
207        let entry = Entry::new(state, tick, curr.head);
208        let token = Token(self.entries.insert(entry));
209
210        if curr.head != EMPTY {
211            // If there was a previous entry, set its prev pointer to the new
212            // entry
213            self.entries[curr.head.into()].links.prev = token;
214        }
215
216        // Update the head slot
217        self.wheel[slot] = WheelEntry {
218            next_tick: cmp::min(tick, curr.next_tick),
219            head: token,
220        };
221
222        self.schedule_readiness(tick);
223
224        trace!("inserted timout; slot={}; token={:?}", slot, token);
225
226        // Return the new timeout
227        Timeout { token, tick }
228    }
229
230    /// Cancel a timeout.
231    ///
232    /// If the timeout has not yet occurred, the return value holds the
233    /// associated state.
234    pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
235        let links = match self.entries.get(timeout.token.into()) {
236            Some(e) => e.links,
237            None => return None,
238        };
239
240        // Sanity check
241        if links.tick != timeout.tick {
242            return None;
243        }
244
245        self.unlink(&links, timeout.token);
246        Some(self.entries.remove(timeout.token.into()).state)
247    }
248
249    /// Poll for an expired timer.
250    ///
251    /// The return value holds the state associated with the first expired
252    /// timer, if any.
253    pub fn poll(&mut self) -> Option<T> {
254        let target_tick = current_tick(self.start, self.tick_ms);
255        self.poll_to(target_tick)
256    }
257
258    fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
259        trace!(
260            "tick_to; target_tick={}; current_tick={}",
261            target_tick,
262            self.tick
263        );
264
265        if target_tick < self.tick {
266            target_tick = self.tick;
267        }
268
269        while self.tick <= target_tick {
270            let curr = self.next;
271
272            trace!("ticking; curr={:?}", curr);
273
274            if curr == EMPTY {
275                self.tick += 1;
276
277                let slot = self.slot_for(self.tick);
278                self.next = self.wheel[slot].head;
279
280                // Handle the case when a slot has a single timeout which gets
281                // canceled before the timeout expires. In this case, the
282                // slot's head is EMPTY but there is a value for next_tick. Not
283                // resetting next_tick here causes the timer to get stuck in a
284                // loop.
285                if self.next == EMPTY {
286                    self.wheel[slot].next_tick = TICK_MAX;
287                }
288            } else {
289                let slot = self.slot_for(self.tick);
290
291                if curr == self.wheel[slot].head {
292                    self.wheel[slot].next_tick = TICK_MAX;
293                }
294
295                let links = self.entries[curr.into()].links;
296
297                if links.tick <= self.tick {
298                    trace!("triggering; token={:?}", curr);
299
300                    // Unlink will also advance self.next
301                    self.unlink(&links, curr);
302
303                    // Remove and return the token
304                    return Some(self.entries.remove(curr.into()).state);
305                } else {
306                    let next_tick = self.wheel[slot].next_tick;
307                    self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
308                    self.next = links.next;
309                }
310            }
311        }
312
313        // No more timeouts to poll
314        if let Some(inner) = self.inner.borrow() {
315            trace!("unsetting readiness");
316            let _ = inner.set_readiness.set_readiness(Ready::empty());
317
318            if let Some(tick) = self.next_tick() {
319                self.schedule_readiness(tick);
320            }
321        }
322
323        None
324    }
325
326    fn unlink(&mut self, links: &EntryLinks, token: Token) {
327        trace!(
328            "unlinking timeout; slot={}; token={:?}",
329            self.slot_for(links.tick),
330            token
331        );
332
333        if links.prev == EMPTY {
334            let slot = self.slot_for(links.tick);
335            self.wheel[slot].head = links.next;
336        } else {
337            self.entries[links.prev.into()].links.next = links.next;
338        }
339
340        if links.next != EMPTY {
341            self.entries[links.next.into()].links.prev = links.prev;
342
343            if token == self.next {
344                self.next = links.next;
345            }
346        } else if token == self.next {
347            self.next = EMPTY;
348        }
349    }
350
351    fn schedule_readiness(&self, tick: Tick) {
352        if let Some(inner) = self.inner.borrow() {
353            // Coordinate setting readiness w/ the wakeup thread
354            let mut curr = inner.wakeup_state.load(Ordering::Acquire);
355
356            loop {
357                if curr as Tick <= tick {
358                    // Nothing to do, wakeup is already scheduled
359                    return;
360                }
361
362                // Attempt to move the wakeup time forward
363                trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
364                match inner.wakeup_state.compare_exchange_weak(
365                    curr,
366                    tick as usize,
367                    Ordering::Release,
368                    Ordering::Relaxed,
369                ) {
370                    Ok(_) => {
371                        // Signal to the wakeup thread that the wakeup time has been changed.
372                        trace!("unparking wakeup thread");
373                        inner.wakeup_thread.thread().unpark();
374                        return;
375                    }
376                    Err(actual) => curr = actual,
377                }
378            }
379        }
380    }
381
382    // Next tick containing a timeout
383    fn next_tick(&self) -> Option<Tick> {
384        if self.next != EMPTY {
385            let slot = self.slot_for(self.entries[self.next.into()].links.tick);
386
387            if self.wheel[slot].next_tick == self.tick {
388                // There is data ready right now
389                return Some(self.tick);
390            }
391        }
392
393        self.wheel.iter().map(|e| e.next_tick).min()
394    }
395
396    fn slot_for(&self, tick: Tick) -> usize {
397        (self.mask & tick) as usize
398    }
399}
400
401impl<T> Default for Timer<T> {
402    fn default() -> Timer<T> {
403        Builder::default().build()
404    }
405}
406
407impl<T> Evented for Timer<T> {
408    fn register(
409        &self,
410        poll: &Poll,
411        token: Token,
412        interest: Ready,
413        opts: PollOpt,
414    ) -> io::Result<()> {
415        if self.inner.borrow().is_some() {
416            return Err(io::Error::new(
417                io::ErrorKind::Other,
418                "timer already registered",
419            ));
420        }
421
422        let (registration, set_readiness) = Registration::new2();
423        poll.register(&registration, token, interest, opts)?;
424        let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
425        let thread_handle = spawn_wakeup_thread(
426            Arc::clone(&wakeup_state),
427            set_readiness.clone(),
428            self.start,
429            self.tick_ms,
430        );
431
432        self.inner
433            .fill(Inner {
434                registration,
435                set_readiness,
436                wakeup_state,
437                wakeup_thread: thread_handle,
438            })
439            .expect("timer already registered");
440
441        if let Some(next_tick) = self.next_tick() {
442            self.schedule_readiness(next_tick);
443        }
444
445        Ok(())
446    }
447
448    fn reregister(
449        &self,
450        poll: &Poll,
451        token: Token,
452        interest: Ready,
453        opts: PollOpt,
454    ) -> io::Result<()> {
455        match self.inner.borrow() {
456            Some(inner) => poll.reregister(&inner.registration, token, interest, opts),
457            None => Err(io::Error::new(
458                io::ErrorKind::Other,
459                "receiver not registered",
460            )),
461        }
462    }
463
464    fn deregister(&self, poll: &Poll) -> io::Result<()> {
465        match self.inner.borrow() {
466            Some(inner) => poll.deregister(&inner.registration),
467            None => Err(io::Error::new(
468                io::ErrorKind::Other,
469                "receiver not registered",
470            )),
471        }
472    }
473}
474
475impl fmt::Debug for Inner {
476    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
477        fmt.debug_struct("Inner")
478            .field("registration", &self.registration)
479            .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
480            .finish()
481    }
482}
483
484fn spawn_wakeup_thread(
485    state: WakeupState,
486    set_readiness: SetReadiness,
487    start: Instant,
488    tick_ms: u64,
489) -> thread::JoinHandle<()> {
490    thread::Builder::new()
491        .name(format!(
492            "mio_extras::timer : {}",
493            thread::current().name().unwrap_or("no name")
494        ))
495        .spawn(move || {
496            let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
497
498            loop {
499                if sleep_until_tick == TERMINATE_THREAD as Tick {
500                    return;
501                }
502
503                let now_tick = current_tick(start, tick_ms);
504
505                trace!(
506                    "wakeup thread: sleep_until_tick={:?}; now_tick={:?}",
507                    sleep_until_tick,
508                    now_tick
509                );
510
511                if now_tick < sleep_until_tick {
512                    // Calling park_timeout with u64::MAX leads to undefined
513                    // behavior in pthread, causing the park to return immediately
514                    // and causing the thread to tightly spin. Instead of u64::MAX
515                    // on large values, simply use a blocking park.
516                    match tick_ms.checked_mul(sleep_until_tick - now_tick) {
517                        Some(sleep_duration) => {
518                            trace!(
519                            "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
520                            tick_ms,
521                            now_tick,
522                            sleep_until_tick,
523                            sleep_duration
524                        );
525                            thread::park_timeout(Duration::from_millis(sleep_duration));
526                        }
527                        None => {
528                            trace!(
529                                "sleeping; tick_ms={}; now_tick={}; blocking sleep",
530                                tick_ms,
531                                now_tick
532                            );
533                            thread::park();
534                        }
535                    }
536                    sleep_until_tick = state.load(Ordering::Acquire) as Tick;
537                } else {
538                    match state.compare_exchange_weak(
539                        sleep_until_tick as usize,
540                        usize::MAX,
541                        Ordering::AcqRel,
542                        Ordering::Acquire,
543                    ) {
544                        Ok(_) => {
545                            trace!("setting readiness from wakeup thread");
546                            let _ = set_readiness.set_readiness(Ready::readable());
547                            sleep_until_tick = usize::MAX as Tick;
548                        }
549                        Err(actual) => sleep_until_tick = actual as Tick,
550                    }
551                }
552            }
553        })
554        .unwrap()
555}
556
557fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
558    // Calculate tick rounding up to the closest one
559    let elapsed_ms = convert::millis(elapsed);
560    elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
561}
562
563fn current_tick(start: Instant, tick_ms: u64) -> Tick {
564    duration_to_tick(start.elapsed(), tick_ms)
565}
566
567impl<T> Entry<T> {
568    fn new(state: T, tick: u64, next: Token) -> Entry<T> {
569        Entry {
570            state,
571            links: EntryLinks {
572                tick,
573                prev: EMPTY,
574                next,
575            },
576        }
577    }
578}
579
580#[cfg(test)]
581mod test {
582    use super::*;
583    use std::time::{Duration, Instant};
584
585    #[test]
586    pub fn test_timeout_next_tick() {
587        let mut t = timer();
588        let mut tick;
589
590        t.set_timeout_at(Duration::from_millis(100), "a");
591
592        tick = ms_to_tick(&t, 50);
593        assert_eq!(None, t.poll_to(tick));
594
595        tick = ms_to_tick(&t, 100);
596        assert_eq!(Some("a"), t.poll_to(tick));
597        assert_eq!(None, t.poll_to(tick));
598
599        tick = ms_to_tick(&t, 150);
600        assert_eq!(None, t.poll_to(tick));
601
602        tick = ms_to_tick(&t, 200);
603        assert_eq!(None, t.poll_to(tick));
604
605        assert_eq!(count(&t), 0);
606    }
607
608    #[test]
609    pub fn test_clearing_timeout() {
610        let mut t = timer();
611        let mut tick;
612
613        let to = t.set_timeout_at(Duration::from_millis(100), "a");
614        assert_eq!("a", t.cancel_timeout(&to).unwrap());
615
616        tick = ms_to_tick(&t, 100);
617        assert_eq!(None, t.poll_to(tick));
618
619        tick = ms_to_tick(&t, 200);
620        assert_eq!(None, t.poll_to(tick));
621
622        assert_eq!(count(&t), 0);
623    }
624
625    #[test]
626    pub fn test_multiple_timeouts_same_tick() {
627        let mut t = timer();
628        let mut tick;
629
630        t.set_timeout_at(Duration::from_millis(100), "a");
631        t.set_timeout_at(Duration::from_millis(100), "b");
632
633        let mut rcv = vec![];
634
635        tick = ms_to_tick(&t, 100);
636        rcv.push(t.poll_to(tick).unwrap());
637        rcv.push(t.poll_to(tick).unwrap());
638
639        assert_eq!(None, t.poll_to(tick));
640
641        rcv.sort_unstable();
642        assert!(rcv == ["a", "b"], "actual={:?}", rcv);
643
644        tick = ms_to_tick(&t, 200);
645        assert_eq!(None, t.poll_to(tick));
646
647        assert_eq!(count(&t), 0);
648    }
649
650    #[test]
651    pub fn test_multiple_timeouts_diff_tick() {
652        let mut t = timer();
653        let mut tick;
654
655        t.set_timeout_at(Duration::from_millis(110), "a");
656        t.set_timeout_at(Duration::from_millis(220), "b");
657        t.set_timeout_at(Duration::from_millis(230), "c");
658        t.set_timeout_at(Duration::from_millis(440), "d");
659        t.set_timeout_at(Duration::from_millis(560), "e");
660
661        tick = ms_to_tick(&t, 100);
662        assert_eq!(Some("a"), t.poll_to(tick));
663        assert_eq!(None, t.poll_to(tick));
664
665        tick = ms_to_tick(&t, 200);
666        assert_eq!(Some("c"), t.poll_to(tick));
667        assert_eq!(Some("b"), t.poll_to(tick));
668        assert_eq!(None, t.poll_to(tick));
669
670        tick = ms_to_tick(&t, 300);
671        assert_eq!(None, t.poll_to(tick));
672
673        tick = ms_to_tick(&t, 400);
674        assert_eq!(Some("d"), t.poll_to(tick));
675        assert_eq!(None, t.poll_to(tick));
676
677        tick = ms_to_tick(&t, 500);
678        assert_eq!(None, t.poll_to(tick));
679
680        tick = ms_to_tick(&t, 600);
681        assert_eq!(Some("e"), t.poll_to(tick));
682        assert_eq!(None, t.poll_to(tick));
683    }
684
685    #[test]
686    pub fn test_catching_up() {
687        let mut t = timer();
688
689        t.set_timeout_at(Duration::from_millis(110), "a");
690        t.set_timeout_at(Duration::from_millis(220), "b");
691        t.set_timeout_at(Duration::from_millis(230), "c");
692        t.set_timeout_at(Duration::from_millis(440), "d");
693
694        let tick = ms_to_tick(&t, 600);
695        assert_eq!(Some("a"), t.poll_to(tick));
696        assert_eq!(Some("c"), t.poll_to(tick));
697        assert_eq!(Some("b"), t.poll_to(tick));
698        assert_eq!(Some("d"), t.poll_to(tick));
699        assert_eq!(None, t.poll_to(tick));
700    }
701
702    #[test]
703    pub fn test_timeout_hash_collision() {
704        let mut t = timer();
705        let mut tick;
706
707        t.set_timeout_at(Duration::from_millis(100), "a");
708        t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
709
710        tick = ms_to_tick(&t, 100);
711        assert_eq!(Some("a"), t.poll_to(tick));
712        assert_eq!(1, count(&t));
713
714        tick = ms_to_tick(&t, 200);
715        assert_eq!(None, t.poll_to(tick));
716        assert_eq!(1, count(&t));
717
718        tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
719        assert_eq!(Some("b"), t.poll_to(tick));
720        assert_eq!(0, count(&t));
721    }
722
723    #[test]
724    pub fn test_clearing_timeout_between_triggers() {
725        let mut t = timer();
726        let mut tick;
727
728        let a = t.set_timeout_at(Duration::from_millis(100), "a");
729        let _ = t.set_timeout_at(Duration::from_millis(100), "b");
730        let _ = t.set_timeout_at(Duration::from_millis(200), "c");
731
732        tick = ms_to_tick(&t, 100);
733        assert_eq!(Some("b"), t.poll_to(tick));
734        assert_eq!(2, count(&t));
735
736        t.cancel_timeout(&a);
737        assert_eq!(1, count(&t));
738
739        assert_eq!(None, t.poll_to(tick));
740
741        tick = ms_to_tick(&t, 200);
742        assert_eq!(Some("c"), t.poll_to(tick));
743        assert_eq!(0, count(&t));
744    }
745
746    const TICK: u64 = 100;
747    const SLOTS: usize = 16;
748    const CAPACITY: usize = 32;
749
750    fn count<T>(timer: &Timer<T>) -> usize {
751        timer.entries.len()
752    }
753
754    fn timer() -> Timer<&'static str> {
755        Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
756    }
757
758    fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
759        ms / timer.tick_ms
760    }
761}