mio_more/
timer.rs

1//! Timer optimized for I/O related operations
2
3use convert;
4use mio::{self, Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
5use lazycell::LazyCell;
6use std::{cmp, error, fmt, io, u64, usize, iter, thread};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use self::TimerErrorKind::TimerOverflow;
12
13pub struct Timer<T> {
14    // Size of each tick in milliseconds
15    tick_ms: u64,
16    // Slab of timeout entries
17    entries: Slab<Entry<T>>,
18    // Timeout wheel. Each tick, the timer will look at the next slot for
19    // timeouts that match the current tick.
20    wheel: Vec<WheelEntry>,
21    // Tick 0's time instant
22    start: Instant,
23    // The current tick
24    tick: Tick,
25    // The next entry to possibly timeout
26    next: Token,
27    // Masks the target tick to get the slot
28    mask: u64,
29    // Set on registration with Poll
30    inner: LazyCell<Inner>,
31}
32
33pub struct Builder {
34    // Approximate duration of each tick
35    tick: Duration,
36    // Number of slots in the timer wheel
37    num_slots: usize,
38    // Max number of timeouts that can be in flight at a given time.
39    capacity: usize,
40}
41
42#[derive(Clone, Debug)]
43pub struct Timeout {
44    // Reference into the timer entry slab
45    token: Token,
46    // Tick that it should match up with
47    tick: u64,
48}
49
50struct Inner {
51    registration: Registration,
52    set_readiness: SetReadiness,
53    wakeup_state: WakeupState,
54    wakeup_thread: thread::JoinHandle<()>,
55}
56
57#[derive(Copy, Clone, Debug)]
58struct WheelEntry {
59    next_tick: Tick,
60    head: Token,
61}
62
63// Doubly linked list of timer entries. Allows for efficient insertion /
64// removal of timeouts.
65struct Entry<T> {
66    state: T,
67    links: EntryLinks,
68}
69
70#[derive(Copy, Clone)]
71struct EntryLinks {
72    tick: Tick,
73    prev: Token,
74    next: Token
75}
76
77type Tick = u64;
78
79const TICK_MAX: Tick = u64::MAX;
80
81// Manages communication with wakeup thread
82type WakeupState = Arc<AtomicUsize>;
83
84type Slab<T> = ::slab::Slab<T, mio::Token>;
85
86pub type Result<T> = ::std::result::Result<T, TimerError>;
87// TODO: remove
88pub type TimerResult<T> = Result<T>;
89
90
91#[derive(Debug)]
92pub struct TimerError {
93    kind: TimerErrorKind,
94    desc: &'static str,
95}
96
97#[derive(Debug)]
98pub enum TimerErrorKind {
99    TimerOverflow,
100}
101
102// TODO: Remove
103pub type OldTimerResult<T> = Result<T>;
104
105const TERMINATE_THREAD: usize = 0;
106const EMPTY: Token = Token(usize::MAX);
107
108impl Builder {
109    pub fn tick_duration(mut self, duration: Duration) -> Builder {
110        self.tick = duration;
111        self
112    }
113
114    pub fn num_slots(mut self, num_slots: usize) -> Builder {
115        self.num_slots = num_slots;
116        self
117    }
118
119    pub fn capacity(mut self, capacity: usize) -> Builder {
120        self.capacity = capacity;
121        self
122    }
123
124    pub fn build<T>(self) -> Timer<T> {
125        Timer::new(convert::millis(self.tick), self.num_slots, self.capacity, Instant::now())
126    }
127}
128
129impl Default for Builder {
130    fn default() -> Builder {
131        Builder {
132            tick: Duration::from_millis(100),
133            num_slots: 256,
134            capacity: 65_536,
135        }
136    }
137}
138
139impl<T> Timer<T> {
140    fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
141        let num_slots = num_slots.next_power_of_two();
142        let capacity = capacity.next_power_of_two();
143        let mask = (num_slots as u64) - 1;
144        let wheel = iter::repeat(WheelEntry { next_tick: TICK_MAX, head: EMPTY })
145            .take(num_slots).collect();
146
147        Timer {
148            tick_ms: tick_ms,
149            entries: Slab::with_capacity(capacity),
150            wheel: wheel,
151            start: start,
152            tick: 0,
153            next: EMPTY,
154            mask: mask,
155            inner: LazyCell::new(),
156        }
157    }
158
159    pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Result<Timeout> {
160        let delay_from_start = self.start.elapsed() + delay_from_now;
161        self.set_timeout_at(delay_from_start, state)
162    }
163
164    fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Result<Timeout> {
165        let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
166        trace!("setting timeout; delay={:?}; tick={:?}; current-tick={:?}", delay_from_start, tick, self.tick);
167
168        // Always target at least 1 tick in the future
169        if tick <= self.tick {
170            tick = self.tick + 1;
171        }
172
173        self.insert(tick, state)
174    }
175
176    fn insert(&mut self, tick: Tick, state: T) -> Result<Timeout> {
177        // Get the slot for the requested tick
178        let slot = (tick & self.mask) as usize;
179        let curr = self.wheel[slot];
180
181        // Insert the new entry
182        let token = try!(
183            self.entries.insert(Entry::new(state, tick, curr.head))
184            .map_err(|_| TimerError::overflow()));
185
186        if curr.head != EMPTY {
187            // If there was a previous entry, set its prev pointer to the new
188            // entry
189            self.entries[curr.head].links.prev = token;
190        }
191
192        // Update the head slot
193        self.wheel[slot] = WheelEntry {
194            next_tick: cmp::min(tick, curr.next_tick),
195            head: token,
196        };
197
198        self.schedule_readiness(tick);
199
200        trace!("inserted timout; slot={}; token={:?}", slot, token);
201
202        // Return the new timeout
203        Ok(Timeout {
204            token: token,
205            tick: tick
206        })
207    }
208
209    pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
210        let links = match self.entries.get(timeout.token) {
211            Some(e) => e.links,
212            None => return None
213        };
214
215        // Sanity check
216        if links.tick != timeout.tick {
217            return None;
218        }
219
220        self.unlink(&links, timeout.token);
221        self.entries.remove(timeout.token).map(|e| e.state)
222    }
223
224    pub fn poll(&mut self) -> Option<T> {
225        let target_tick = current_tick(self.start, self.tick_ms);
226        self.poll_to(target_tick)
227    }
228
229    fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
230        trace!("tick_to; target_tick={}; current_tick={}", target_tick, self.tick);
231
232        if target_tick < self.tick {
233            target_tick = self.tick;
234        }
235
236        while self.tick <= target_tick {
237            let curr = self.next;
238
239            trace!("ticking; curr={:?}", curr);
240
241            if curr == EMPTY {
242                self.tick += 1;
243
244                let slot = self.slot_for(self.tick);
245                self.next = self.wheel[slot].head;
246
247                // Handle the case when a slot has a single timeout which gets
248                // canceled before the timeout expires. In this case, the
249                // slot's head is EMPTY but there is a value for next_tick. Not
250                // resetting next_tick here causes the timer to get stuck in a
251                // loop.
252                if self.next == EMPTY {
253                    self.wheel[slot].next_tick = TICK_MAX;
254                }
255            } else {
256                let slot = self.slot_for(self.tick);
257
258                if curr == self.wheel[slot].head {
259                    self.wheel[slot].next_tick = TICK_MAX;
260                }
261
262                let links = self.entries[curr].links;
263
264                if links.tick <= self.tick {
265                    trace!("triggering; token={:?}", curr);
266
267                    // Unlink will also advance self.next
268                    self.unlink(&links, curr);
269
270                    // Remove and return the token
271                    return self.entries.remove(curr)
272                        .map(|e| e.state);
273                } else {
274                    let next_tick = self.wheel[slot].next_tick;
275                    self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
276                    self.next = links.next;
277                }
278            }
279        }
280
281        // No more timeouts to poll
282        if let Some(inner) = self.inner.borrow() {
283            trace!("unsetting readiness");
284            let _ = inner.set_readiness.set_readiness(Ready::none());
285
286            if let Some(tick) = self.next_tick() {
287                self.schedule_readiness(tick);
288            }
289        }
290
291        None
292    }
293
294    fn unlink(&mut self, links: &EntryLinks, token: Token) {
295       trace!("unlinking timeout; slot={}; token={:?}",
296               self.slot_for(links.tick), token);
297
298        if links.prev == EMPTY {
299            let slot = self.slot_for(links.tick);
300            self.wheel[slot].head = links.next;
301        } else {
302            self.entries[links.prev].links.next = links.next;
303        }
304
305        if links.next != EMPTY {
306            self.entries[links.next].links.prev = links.prev;
307
308            if token == self.next {
309                self.next = links.next;
310            }
311        } else if token == self.next {
312            self.next = EMPTY;
313        }
314    }
315
316    fn schedule_readiness(&self, tick: Tick) {
317        if let Some(inner) = self.inner.borrow() {
318            // Coordinate setting readiness w/ the wakeup thread
319            let mut curr = inner.wakeup_state.load(Ordering::Acquire);
320
321            loop {
322                if curr as Tick <= tick {
323                    // Nothing to do, wakeup is already scheduled
324                    return;
325                }
326
327                // Attempt to move the wakeup time forward
328                trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
329                let actual = inner.wakeup_state.compare_and_swap(curr, tick as usize, Ordering::Release);
330
331                if actual == curr {
332                    // Signal to the wakeup thread that the wakeup time has
333                    // been changed.
334                    trace!("unparking wakeup thread");
335                    inner.wakeup_thread.thread().unpark();
336                    return;
337                }
338
339                curr = actual;
340            }
341        }
342    }
343
344    // Next tick containing a timeout
345    fn next_tick(&self) -> Option<Tick> {
346        if self.next != EMPTY {
347            let slot = self.slot_for(self.entries[self.next].links.tick);
348
349            if self.wheel[slot].next_tick == self.tick {
350                // There is data ready right now
351                return Some(self.tick);
352            }
353        }
354
355        self.wheel.iter().map(|e| e.next_tick).min()
356    }
357
358    fn slot_for(&self, tick: Tick) -> usize {
359        (self.mask & tick) as usize
360    }
361}
362
363impl<T> Default for Timer<T> {
364    fn default() -> Timer<T> {
365        Builder::default().build()
366    }
367}
368
369impl<T> Evented for Timer<T> {
370    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
371        if self.inner.borrow().is_some() {
372            return Err(io::Error::new(io::ErrorKind::Other, "timer already registered"));
373        }
374
375        let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
376        let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
377        let thread_handle = spawn_wakeup_thread(
378            wakeup_state.clone(),
379            set_readiness.clone(),
380            self.start, self.tick_ms);
381
382        self.inner.fill(Inner {
383            registration: registration,
384            set_readiness: set_readiness,
385            wakeup_state: wakeup_state,
386            wakeup_thread: thread_handle,
387        }).ok().expect("timer already registered");
388
389        if let Some(next_tick) = self.next_tick() {
390            self.schedule_readiness(next_tick);
391        }
392
393        Ok(())
394    }
395
396    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
397        match self.inner.borrow() {
398            Some(inner) => inner.registration.update(poll, token, interest, opts),
399            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
400        }
401    }
402
403    fn deregister(&self, poll: &Poll) -> io::Result<()> {
404        match self.inner.borrow() {
405            Some(inner) => inner.registration.deregister(poll),
406            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
407        }
408    }
409}
410
411impl fmt::Debug for Inner {
412    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
413        fmt.debug_struct("Inner")
414            .field("registration", &self.registration)
415            .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
416            .finish()
417    }
418}
419
420fn spawn_wakeup_thread(state: WakeupState, set_readiness: SetReadiness, start: Instant, tick_ms: u64) -> thread::JoinHandle<()> {
421    thread::spawn(move || {
422        let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
423
424        loop {
425            if sleep_until_tick == TERMINATE_THREAD as Tick {
426                return;
427            }
428
429            let now_tick = current_tick(start, tick_ms);
430
431            trace!("wakeup thread: sleep_until_tick={:?}; now_tick={:?}", sleep_until_tick, now_tick);
432
433            if now_tick < sleep_until_tick {
434                // Calling park_timeout with u64::MAX leads to undefined
435                // behavior in pthread, causing the park to return immediately
436                // and causing the thread to tightly spin. Instead of u64::MAX
437                // on large values, simply use a blocking park.
438                match tick_ms.checked_mul(sleep_until_tick - now_tick) {
439                    Some(sleep_duration) => {
440                        trace!("sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
441                               tick_ms, now_tick, sleep_until_tick, sleep_duration);
442                        thread::park_timeout(Duration::from_millis(sleep_duration));
443                    }
444                    None => {
445                        trace!("sleeping; tick_ms={}; now_tick={}; blocking sleep",
446                               tick_ms, now_tick);
447                        thread::park();
448                    }
449                }
450                sleep_until_tick = state.load(Ordering::Acquire) as Tick;
451            } else {
452                let actual = state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel) as Tick;
453
454                if actual == sleep_until_tick {
455                    trace!("setting readiness from wakeup thread");
456                    let _ = set_readiness.set_readiness(Ready::readable());
457                    sleep_until_tick = usize::MAX as Tick;
458                } else {
459                    sleep_until_tick = actual as Tick;
460                }
461            }
462        }
463    })
464}
465
466fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
467    // Calculate tick rounding up to the closest one
468    let elapsed_ms = convert::millis(elapsed);
469    elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
470}
471
472fn current_tick(start: Instant, tick_ms: u64) -> Tick {
473    duration_to_tick(start.elapsed(), tick_ms)
474}
475
476impl<T> Entry<T> {
477    fn new(state: T, tick: u64, next: Token) -> Entry<T> {
478        Entry {
479            state: state,
480            links: EntryLinks {
481                tick: tick,
482                prev: EMPTY,
483                next: next,
484            },
485        }
486    }
487}
488
489impl fmt::Display for TimerError {
490    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
491        write!(fmt, "{}: {}", self.kind, self.desc)
492    }
493}
494
495impl TimerError {
496    fn overflow() -> TimerError {
497        TimerError {
498            kind: TimerOverflow,
499            desc: "too many timer entries"
500        }
501    }
502}
503
504impl error::Error for TimerError {
505    fn description(&self) -> &str {
506        self.desc
507    }
508}
509
510impl fmt::Display for TimerErrorKind {
511    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
512        match *self {
513            TimerOverflow => write!(fmt, "TimerOverflow"),
514        }
515    }
516}
517
518#[cfg(test)]
519mod test {
520    use super::*;
521    use std::time::{Duration, Instant};
522
523    #[test]
524    pub fn test_timeout_next_tick() {
525        let mut t = timer();
526        let mut tick;
527
528        t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
529
530        tick = ms_to_tick(&t, 50);
531        assert_eq!(None, t.poll_to(tick));
532
533        tick = ms_to_tick(&t, 100);
534        assert_eq!(Some("a"), t.poll_to(tick));
535        assert_eq!(None, t.poll_to(tick));
536
537        tick = ms_to_tick(&t, 150);
538        assert_eq!(None, t.poll_to(tick));
539
540        tick = ms_to_tick(&t, 200);
541        assert_eq!(None, t.poll_to(tick));
542
543        assert_eq!(count(&t), 0);
544    }
545
546    #[test]
547    pub fn test_clearing_timeout() {
548        let mut t = timer();
549        let mut tick;
550
551        let to = t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
552        assert_eq!("a", t.cancel_timeout(&to).unwrap());
553
554        tick = ms_to_tick(&t, 100);
555        assert_eq!(None, t.poll_to(tick));
556
557        tick = ms_to_tick(&t, 200);
558        assert_eq!(None, t.poll_to(tick));
559
560        assert_eq!(count(&t), 0);
561    }
562
563    #[test]
564    pub fn test_multiple_timeouts_same_tick() {
565        let mut t = timer();
566        let mut tick;
567
568        t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
569        t.set_timeout_at(Duration::from_millis(100), "b").unwrap();
570
571        let mut rcv = vec![];
572
573        tick = ms_to_tick(&t, 100);
574        rcv.push(t.poll_to(tick).unwrap());
575        rcv.push(t.poll_to(tick).unwrap());
576
577        assert_eq!(None, t.poll_to(tick));
578
579        rcv.sort();
580        assert!(rcv == ["a", "b"], "actual={:?}", rcv);
581
582        tick = ms_to_tick(&t, 200);
583        assert_eq!(None, t.poll_to(tick));
584
585        assert_eq!(count(&t), 0);
586    }
587
588    #[test]
589    pub fn test_multiple_timeouts_diff_tick() {
590        let mut t = timer();
591        let mut tick;
592
593        t.set_timeout_at(Duration::from_millis(110), "a").unwrap();
594        t.set_timeout_at(Duration::from_millis(220), "b").unwrap();
595        t.set_timeout_at(Duration::from_millis(230), "c").unwrap();
596        t.set_timeout_at(Duration::from_millis(440), "d").unwrap();
597        t.set_timeout_at(Duration::from_millis(560), "e").unwrap();
598
599        tick = ms_to_tick(&t, 100);
600        assert_eq!(Some("a"), t.poll_to(tick));
601        assert_eq!(None, t.poll_to(tick));
602
603        tick = ms_to_tick(&t, 200);
604        assert_eq!(Some("c"), t.poll_to(tick));
605        assert_eq!(Some("b"), t.poll_to(tick));
606        assert_eq!(None, t.poll_to(tick));
607
608        tick = ms_to_tick(&t, 300);
609        assert_eq!(None, t.poll_to(tick));
610
611        tick = ms_to_tick(&t, 400);
612        assert_eq!(Some("d"), t.poll_to(tick));
613        assert_eq!(None, t.poll_to(tick));
614
615        tick = ms_to_tick(&t, 500);
616        assert_eq!(None, t.poll_to(tick));
617
618        tick = ms_to_tick(&t, 600);
619        assert_eq!(Some("e"), t.poll_to(tick));
620        assert_eq!(None, t.poll_to(tick));
621    }
622
623    #[test]
624    pub fn test_catching_up() {
625        let mut t = timer();
626
627        t.set_timeout_at(Duration::from_millis(110), "a").unwrap();
628        t.set_timeout_at(Duration::from_millis(220), "b").unwrap();
629        t.set_timeout_at(Duration::from_millis(230), "c").unwrap();
630        t.set_timeout_at(Duration::from_millis(440), "d").unwrap();
631
632        let tick = ms_to_tick(&t, 600);
633        assert_eq!(Some("a"), t.poll_to(tick));
634        assert_eq!(Some("c"), t.poll_to(tick));
635        assert_eq!(Some("b"), t.poll_to(tick));
636        assert_eq!(Some("d"), t.poll_to(tick));
637        assert_eq!(None, t.poll_to(tick));
638    }
639
640    #[test]
641    pub fn test_timeout_hash_collision() {
642        let mut t = timer();
643        let mut tick;
644
645        t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
646        t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b").unwrap();
647
648        tick = ms_to_tick(&t, 100);
649        assert_eq!(Some("a"), t.poll_to(tick));
650        assert_eq!(1, count(&t));
651
652        tick = ms_to_tick(&t, 200);
653        assert_eq!(None, t.poll_to(tick));
654        assert_eq!(1, count(&t));
655
656        tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
657        assert_eq!(Some("b"), t.poll_to(tick));
658        assert_eq!(0, count(&t));
659    }
660
661    #[test]
662    pub fn test_clearing_timeout_between_triggers() {
663        let mut t = timer();
664        let mut tick;
665
666        let a = t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
667        let _ = t.set_timeout_at(Duration::from_millis(100), "b").unwrap();
668        let _ = t.set_timeout_at(Duration::from_millis(200), "c").unwrap();
669
670        tick = ms_to_tick(&t, 100);
671        assert_eq!(Some("b"), t.poll_to(tick));
672        assert_eq!(2, count(&t));
673
674        t.cancel_timeout(&a);
675        assert_eq!(1, count(&t));
676
677        assert_eq!(None, t.poll_to(tick));
678
679        tick = ms_to_tick(&t, 200);
680        assert_eq!(Some("c"), t.poll_to(tick));
681        assert_eq!(0, count(&t));
682    }
683
684    const TICK: u64 = 100;
685    const SLOTS: usize = 16;
686    const CAPACITY: usize = 32;
687
688    fn count<T>(timer: &Timer<T>) -> usize {
689        timer.entries.len()
690    }
691
692    fn timer() -> Timer<&'static str> {
693        Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
694    }
695
696    fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
697        ms / timer.tick_ms
698    }
699
700}