tmp_mio/
timer.rs

1use token::Token;
2use util::Slab;
3use time::precise_time_ns;
4use std::{usize, iter};
5use std::cmp::max;
6
7use self::TimerErrorKind::TimerOverflow;
8
9const EMPTY: Token = Token(usize::MAX);
10const NS_PER_MS: u64 = 1_000_000;
11
12// Implements coarse-grained timeouts using an algorithm based on hashed timing
13// wheels by Varghese & Lauck.
14//
15// TODO:
16// * Handle the case when the timer falls more than an entire wheel behind. There
17//   is no point to loop multiple times around the wheel in one go.
18// * New type for tick, now() -> Tick
19#[derive(Debug)]
20pub struct Timer<T> {
21    // Size of each tick in milliseconds
22    tick_ms: u64,
23    // Slab of timeout entries
24    entries: Slab<Entry<T>>,
25    // Timeout wheel. Each tick, the timer will look at the next slot for
26    // timeouts that match the current tick.
27    wheel: Vec<Token>,
28    // Tick 0's time in milliseconds
29    start: u64,
30    // The current tick
31    tick: u64,
32    // The next entry to possibly timeout
33    next: Token,
34    // Masks the target tick to get the slot
35    mask: u64,
36}
37
38#[derive(Copy, Clone)]
39pub struct Timeout {
40    // Reference into the timer entry slab
41    token: Token,
42    // Tick that it should matchup with
43    tick: u64,
44}
45
46impl<T> Timer<T> {
47    pub fn new(tick_ms: u64, mut slots: usize, mut capacity: usize) -> Timer<T> {
48        slots = slots.next_power_of_two();
49        capacity = capacity.next_power_of_two();
50
51        Timer {
52            tick_ms: tick_ms,
53            entries: Slab::new(capacity),
54            wheel: iter::repeat(EMPTY).take(slots).collect(),
55            start: 0,
56            tick: 0,
57            next: EMPTY,
58            mask: (slots as u64) - 1
59        }
60    }
61
62    #[cfg(test)]
63    pub fn count(&self) -> usize {
64        self.entries.count()
65    }
66
67    // Number of ms remaining until the next tick
68    pub fn next_tick_in_ms(&self) -> Option<u64> {
69        if self.entries.count() == 0 {
70            return None;
71        }
72
73        let now = self.now_ms();
74        let nxt = self.start + (self.tick + 1) * self.tick_ms;
75
76        if nxt <= now {
77            return Some(0);
78        }
79
80        Some(nxt - now)
81    }
82
83    /*
84     *
85     * ===== Initialization =====
86     *
87     */
88
89    // Sets the starting time of the timer using the current system time
90    pub fn setup(&mut self) {
91        let now = self.now_ms();
92        self.set_start_ms(now);
93    }
94
95    fn set_start_ms(&mut self, start: u64) {
96        assert!(!self.is_initialized(), "the timer has already started");
97        self.start = start;
98    }
99
100    /*
101     *
102     * ===== Timeout create / cancel =====
103     *
104     */
105
106    pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> {
107        let at = self.now_ms() + max(0, delay);
108        self.timeout_at_ms(token, at)
109    }
110
111    pub fn timeout_at_ms(&mut self, token: T, mut at: u64) -> TimerResult<Timeout> {
112        // Make relative to start
113        at -= self.start;
114        // Calculate tick
115        let mut tick = (at + self.tick_ms - 1) / self.tick_ms;
116
117        // Always target at least 1 tick in the future
118        if tick <= self.tick {
119            tick = self.tick + 1;
120        }
121
122        self.insert(token, tick)
123    }
124
125    pub fn clear(&mut self, timeout: Timeout) -> bool {
126        let links = match self.entries.get(timeout.token) {
127            Some(e) => e.links,
128            None => return false
129        };
130
131        // Sanity check
132        if links.tick != timeout.tick {
133            return false;
134        }
135
136        self.unlink(&links, timeout.token);
137        self.entries.remove(timeout.token);
138        true
139    }
140
141    fn insert(&mut self, token: T, tick: u64) -> TimerResult<Timeout> {
142        // Get the slot for the requested tick
143        let slot = (tick & self.mask) as usize;
144        let curr = self.wheel[slot];
145
146        // Insert the new entry
147        let token = try!(
148            self.entries.insert(Entry::new(token, tick, curr))
149            .map_err(|_| TimerError::overflow()));
150
151        if curr != EMPTY {
152            // If there was a previous entry, set its prev pointer to the new
153            // entry
154            self.entries[curr].links.prev = token;
155        }
156
157        // Update the head slot
158        self.wheel[slot] = token;
159
160        trace!("inserted timout; slot={}; token={:?}", slot, token);
161
162        // Return the new timeout
163        Ok(Timeout {
164            token: token,
165            tick: tick
166        })
167    }
168
169    fn unlink(&mut self, links: &EntryLinks, token: Token) {
170       trace!("unlinking timeout; slot={}; token={:?}",
171               self.slot_for(links.tick), token);
172
173        if links.prev == EMPTY {
174            let slot = self.slot_for(links.tick);
175            self.wheel[slot] = links.next;
176        } else {
177            self.entries[links.prev].links.next = links.next;
178        }
179
180        if links.next != EMPTY {
181            self.entries[links.next].links.prev = links.prev;
182
183            if token == self.next {
184                self.next = links.next;
185            }
186        } else if token == self.next {
187            self.next = EMPTY;
188        }
189    }
190
191    /*
192     *
193     * ===== Advance time =====
194     *
195     */
196
197    pub fn now(&self) -> u64 {
198        self.ms_to_tick(self.now_ms())
199    }
200
201    pub fn tick_to(&mut self, now: u64) -> Option<T> {
202        trace!("tick_to; now={}; tick={}", now, self.tick);
203
204        while self.tick <= now {
205            let curr = self.next;
206
207            trace!("ticking; curr={:?}", curr);
208
209            if curr == EMPTY {
210                self.tick += 1;
211                self.next = self.wheel[self.slot_for(self.tick)];
212            } else {
213                let links = self.entries[curr].links;
214
215                if links.tick <= self.tick {
216                    trace!("triggering; token={:?}", curr);
217
218                    // Unlink will also advance self.next
219                    self.unlink(&links, curr);
220
221                    // Remove and return the token
222                    return self.entries.remove(curr)
223                        .map(|e| e.token);
224                } else {
225                    self.next = links.next;
226                }
227            }
228        }
229
230        None
231    }
232
233    /*
234     *
235     * ===== Misc =====
236     *
237     */
238
239    // Timers are initialized when either the current time has been advanced or a timeout has been set
240    #[inline]
241    fn is_initialized(&self) -> bool {
242        self.tick > 0 || !self.entries.is_empty()
243    }
244
245    #[inline]
246    fn slot_for(&self, tick: u64) -> usize {
247        (self.mask & tick) as usize
248    }
249
250    // Convert a ms duration into a number of ticks, rounds up
251    #[inline]
252    fn ms_to_tick(&self, ms: u64) -> u64 {
253        (ms - self.start) / self.tick_ms
254    }
255
256    #[inline]
257    fn now_ms(&self) -> u64 {
258        precise_time_ns() / NS_PER_MS
259    }
260}
261
262// Doubly linked list of timer entries. Allows for efficient insertion /
263// removal of timeouts.
264struct Entry<T> {
265    token: T,
266    links: EntryLinks,
267}
268
269impl<T> Entry<T> {
270    fn new(token: T, tick: u64, next: Token) -> Entry<T> {
271        Entry {
272            token: token,
273            links: EntryLinks {
274                tick: tick,
275                prev: EMPTY,
276                next: next,
277            },
278        }
279    }
280}
281
282#[derive(Copy, Clone)]
283struct EntryLinks {
284    tick: u64,
285    prev: Token,
286    next: Token
287}
288
289pub type TimerResult<T> = Result<T, TimerError>;
290
291#[derive(Debug)]
292pub struct TimerError {
293    kind: TimerErrorKind,
294    desc: &'static str,
295}
296
297impl TimerError {
298    fn overflow() -> TimerError {
299        TimerError {
300            kind: TimerOverflow,
301            desc: "too many timer entries"
302        }
303    }
304}
305
306#[derive(Debug)]
307pub enum TimerErrorKind {
308    TimerOverflow,
309}
310
311#[cfg(test)]
312mod test {
313    use super::Timer;
314
315    #[test]
316    pub fn test_timeout_next_tick() {
317        let mut t = timer();
318        let mut tick;
319
320        t.timeout_at_ms("a", 100).unwrap();
321
322        tick = t.ms_to_tick(50);
323        assert_eq!(None, t.tick_to(tick));
324
325        tick = t.ms_to_tick(100);
326        assert_eq!(Some("a"), t.tick_to(tick));
327        assert_eq!(None, t.tick_to(tick));
328
329        tick = t.ms_to_tick(150);
330        assert_eq!(None, t.tick_to(tick));
331
332        tick = t.ms_to_tick(200);
333        assert_eq!(None, t.tick_to(tick));
334
335        assert_eq!(t.count(), 0);
336    }
337
338    #[test]
339    pub fn test_clearing_timeout() {
340        let mut t = timer();
341        let mut tick;
342
343        let to = t.timeout_at_ms("a", 100).unwrap();
344        assert!(t.clear(to));
345
346        tick = t.ms_to_tick(100);
347        assert_eq!(None, t.tick_to(tick));
348
349        tick = t.ms_to_tick(200);
350        assert_eq!(None, t.tick_to(tick));
351
352        assert_eq!(t.count(), 0);
353    }
354
355    #[test]
356    pub fn test_multiple_timeouts_same_tick() {
357        let mut t = timer();
358        let mut tick;
359
360        t.timeout_at_ms("a", 100).unwrap();
361        t.timeout_at_ms("b", 100).unwrap();
362
363        let mut rcv = vec![];
364
365        tick = t.ms_to_tick(100);
366        rcv.push(t.tick_to(tick).unwrap());
367        rcv.push(t.tick_to(tick).unwrap());
368
369        assert_eq!(None, t.tick_to(tick));
370
371        rcv.sort();
372        assert!(rcv == ["a", "b"], "actual={:?}", rcv);
373
374        tick = t.ms_to_tick(200);
375        assert_eq!(None, t.tick_to(tick));
376
377        assert_eq!(t.count(), 0);
378    }
379
380    #[test]
381    pub fn test_multiple_timeouts_diff_tick() {
382        let mut t = timer();
383        let mut tick;
384
385        t.timeout_at_ms("a", 110).unwrap();
386        t.timeout_at_ms("b", 220).unwrap();
387        t.timeout_at_ms("c", 230).unwrap();
388        t.timeout_at_ms("d", 440).unwrap();
389
390        tick = t.ms_to_tick(100);
391        assert_eq!(None, t.tick_to(tick));
392
393        tick = t.ms_to_tick(200);
394        assert_eq!(Some("a"), t.tick_to(tick));
395        assert_eq!(None, t.tick_to(tick));
396
397        tick = t.ms_to_tick(300);
398        assert_eq!(Some("c"), t.tick_to(tick));
399        assert_eq!(Some("b"), t.tick_to(tick));
400        assert_eq!(None, t.tick_to(tick));
401
402        tick = t.ms_to_tick(400);
403        assert_eq!(None, t.tick_to(tick));
404
405        tick = t.ms_to_tick(500);
406        assert_eq!(Some("d"), t.tick_to(tick));
407        assert_eq!(None, t.tick_to(tick));
408
409        tick = t.ms_to_tick(600);
410        assert_eq!(None, t.tick_to(tick));
411    }
412
413    #[test]
414    pub fn test_catching_up() {
415        let mut t = timer();
416
417        t.timeout_at_ms("a", 110).unwrap();
418        t.timeout_at_ms("b", 220).unwrap();
419        t.timeout_at_ms("c", 230).unwrap();
420        t.timeout_at_ms("d", 440).unwrap();
421
422        let tick = t.ms_to_tick(600);
423        assert_eq!(Some("a"), t.tick_to(tick));
424        assert_eq!(Some("c"), t.tick_to(tick));
425        assert_eq!(Some("b"), t.tick_to(tick));
426        assert_eq!(Some("d"), t.tick_to(tick));
427        assert_eq!(None, t.tick_to(tick));
428    }
429
430    #[test]
431    pub fn test_timeout_hash_collision() {
432        let mut t = timer();
433        let mut tick;
434
435        t.timeout_at_ms("a", 100).unwrap();
436        t.timeout_at_ms("b", 100 + TICK * SLOTS as u64).unwrap();
437
438        tick = t.ms_to_tick(100);
439        assert_eq!(Some("a"), t.tick_to(tick));
440        assert_eq!(1, t.count());
441
442        tick = t.ms_to_tick(200);
443        assert_eq!(None, t.tick_to(tick));
444        assert_eq!(1, t.count());
445
446        tick = t.ms_to_tick(100 + TICK * SLOTS as u64);
447        assert_eq!(Some("b"), t.tick_to(tick));
448        assert_eq!(0, t.count());
449    }
450
451    #[test]
452    pub fn test_clearing_timeout_between_triggers() {
453        let mut t = timer();
454        let mut tick;
455
456        let a = t.timeout_at_ms("a", 100).unwrap();
457        let _ = t.timeout_at_ms("b", 100).unwrap();
458        let _ = t.timeout_at_ms("c", 200).unwrap();
459
460        tick = t.ms_to_tick(100);
461        assert_eq!(Some("b"), t.tick_to(tick));
462        assert_eq!(2, t.count());
463
464        t.clear(a);
465        assert_eq!(1, t.count());
466
467        assert_eq!(None, t.tick_to(tick));
468
469        tick = t.ms_to_tick(200);
470        assert_eq!(Some("c"), t.tick_to(tick));
471        assert_eq!(0, t.count());
472    }
473
474    const TICK: u64 = 100;
475    const SLOTS: usize = 16;
476
477    fn timer() -> Timer<&'static str> {
478        Timer::new(TICK, SLOTS, 32)
479    }
480}