1use token::Token;
2use util::Slab;
3use time::precise_time_ns;
4use std::{usize, iter};
5use std::cmp::max;
6
7use self::TimerErrorKind::TimerOverflow;
8
9const EMPTY: Token = Token(usize::MAX);
10const NS_PER_MS: u64 = 1_000_000;
11
12#[derive(Debug)]
20pub struct Timer<T> {
21 tick_ms: u64,
23 entries: Slab<Entry<T>>,
25 wheel: Vec<Token>,
28 start: u64,
30 tick: u64,
32 next: Token,
34 mask: u64,
36}
37
38#[derive(Copy, Clone)]
39pub struct Timeout {
40 token: Token,
42 tick: u64,
44}
45
46impl<T> Timer<T> {
47 pub fn new(tick_ms: u64, mut slots: usize, mut capacity: usize) -> Timer<T> {
48 slots = slots.next_power_of_two();
49 capacity = capacity.next_power_of_two();
50
51 Timer {
52 tick_ms: tick_ms,
53 entries: Slab::new(capacity),
54 wheel: iter::repeat(EMPTY).take(slots).collect(),
55 start: 0,
56 tick: 0,
57 next: EMPTY,
58 mask: (slots as u64) - 1
59 }
60 }
61
62 #[cfg(test)]
63 pub fn count(&self) -> usize {
64 self.entries.count()
65 }
66
67 pub fn next_tick_in_ms(&self) -> Option<u64> {
69 if self.entries.count() == 0 {
70 return None;
71 }
72
73 let now = self.now_ms();
74 let nxt = self.start + (self.tick + 1) * self.tick_ms;
75
76 if nxt <= now {
77 return Some(0);
78 }
79
80 Some(nxt - now)
81 }
82
83 pub fn setup(&mut self) {
91 let now = self.now_ms();
92 self.set_start_ms(now);
93 }
94
95 fn set_start_ms(&mut self, start: u64) {
96 assert!(!self.is_initialized(), "the timer has already started");
97 self.start = start;
98 }
99
100 pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> {
107 let at = self.now_ms() + max(0, delay);
108 self.timeout_at_ms(token, at)
109 }
110
111 pub fn timeout_at_ms(&mut self, token: T, mut at: u64) -> TimerResult<Timeout> {
112 at -= self.start;
114 let mut tick = (at + self.tick_ms - 1) / self.tick_ms;
116
117 if tick <= self.tick {
119 tick = self.tick + 1;
120 }
121
122 self.insert(token, tick)
123 }
124
125 pub fn clear(&mut self, timeout: Timeout) -> bool {
126 let links = match self.entries.get(timeout.token) {
127 Some(e) => e.links,
128 None => return false
129 };
130
131 if links.tick != timeout.tick {
133 return false;
134 }
135
136 self.unlink(&links, timeout.token);
137 self.entries.remove(timeout.token);
138 true
139 }
140
141 fn insert(&mut self, token: T, tick: u64) -> TimerResult<Timeout> {
142 let slot = (tick & self.mask) as usize;
144 let curr = self.wheel[slot];
145
146 let token = try!(
148 self.entries.insert(Entry::new(token, tick, curr))
149 .map_err(|_| TimerError::overflow()));
150
151 if curr != EMPTY {
152 self.entries[curr].links.prev = token;
155 }
156
157 self.wheel[slot] = token;
159
160 trace!("inserted timout; slot={}; token={:?}", slot, token);
161
162 Ok(Timeout {
164 token: token,
165 tick: tick
166 })
167 }
168
169 fn unlink(&mut self, links: &EntryLinks, token: Token) {
170 trace!("unlinking timeout; slot={}; token={:?}",
171 self.slot_for(links.tick), token);
172
173 if links.prev == EMPTY {
174 let slot = self.slot_for(links.tick);
175 self.wheel[slot] = links.next;
176 } else {
177 self.entries[links.prev].links.next = links.next;
178 }
179
180 if links.next != EMPTY {
181 self.entries[links.next].links.prev = links.prev;
182
183 if token == self.next {
184 self.next = links.next;
185 }
186 } else if token == self.next {
187 self.next = EMPTY;
188 }
189 }
190
191 pub fn now(&self) -> u64 {
198 self.ms_to_tick(self.now_ms())
199 }
200
201 pub fn tick_to(&mut self, now: u64) -> Option<T> {
202 trace!("tick_to; now={}; tick={}", now, self.tick);
203
204 while self.tick <= now {
205 let curr = self.next;
206
207 trace!("ticking; curr={:?}", curr);
208
209 if curr == EMPTY {
210 self.tick += 1;
211 self.next = self.wheel[self.slot_for(self.tick)];
212 } else {
213 let links = self.entries[curr].links;
214
215 if links.tick <= self.tick {
216 trace!("triggering; token={:?}", curr);
217
218 self.unlink(&links, curr);
220
221 return self.entries.remove(curr)
223 .map(|e| e.token);
224 } else {
225 self.next = links.next;
226 }
227 }
228 }
229
230 None
231 }
232
233 #[inline]
241 fn is_initialized(&self) -> bool {
242 self.tick > 0 || !self.entries.is_empty()
243 }
244
245 #[inline]
246 fn slot_for(&self, tick: u64) -> usize {
247 (self.mask & tick) as usize
248 }
249
250 #[inline]
252 fn ms_to_tick(&self, ms: u64) -> u64 {
253 (ms - self.start) / self.tick_ms
254 }
255
256 #[inline]
257 fn now_ms(&self) -> u64 {
258 precise_time_ns() / NS_PER_MS
259 }
260}
261
262struct Entry<T> {
265 token: T,
266 links: EntryLinks,
267}
268
269impl<T> Entry<T> {
270 fn new(token: T, tick: u64, next: Token) -> Entry<T> {
271 Entry {
272 token: token,
273 links: EntryLinks {
274 tick: tick,
275 prev: EMPTY,
276 next: next,
277 },
278 }
279 }
280}
281
282#[derive(Copy, Clone)]
283struct EntryLinks {
284 tick: u64,
285 prev: Token,
286 next: Token
287}
288
289pub type TimerResult<T> = Result<T, TimerError>;
290
291#[derive(Debug)]
292pub struct TimerError {
293 kind: TimerErrorKind,
294 desc: &'static str,
295}
296
297impl TimerError {
298 fn overflow() -> TimerError {
299 TimerError {
300 kind: TimerOverflow,
301 desc: "too many timer entries"
302 }
303 }
304}
305
306#[derive(Debug)]
307pub enum TimerErrorKind {
308 TimerOverflow,
309}
310
311#[cfg(test)]
312mod test {
313 use super::Timer;
314
315 #[test]
316 pub fn test_timeout_next_tick() {
317 let mut t = timer();
318 let mut tick;
319
320 t.timeout_at_ms("a", 100).unwrap();
321
322 tick = t.ms_to_tick(50);
323 assert_eq!(None, t.tick_to(tick));
324
325 tick = t.ms_to_tick(100);
326 assert_eq!(Some("a"), t.tick_to(tick));
327 assert_eq!(None, t.tick_to(tick));
328
329 tick = t.ms_to_tick(150);
330 assert_eq!(None, t.tick_to(tick));
331
332 tick = t.ms_to_tick(200);
333 assert_eq!(None, t.tick_to(tick));
334
335 assert_eq!(t.count(), 0);
336 }
337
338 #[test]
339 pub fn test_clearing_timeout() {
340 let mut t = timer();
341 let mut tick;
342
343 let to = t.timeout_at_ms("a", 100).unwrap();
344 assert!(t.clear(to));
345
346 tick = t.ms_to_tick(100);
347 assert_eq!(None, t.tick_to(tick));
348
349 tick = t.ms_to_tick(200);
350 assert_eq!(None, t.tick_to(tick));
351
352 assert_eq!(t.count(), 0);
353 }
354
355 #[test]
356 pub fn test_multiple_timeouts_same_tick() {
357 let mut t = timer();
358 let mut tick;
359
360 t.timeout_at_ms("a", 100).unwrap();
361 t.timeout_at_ms("b", 100).unwrap();
362
363 let mut rcv = vec![];
364
365 tick = t.ms_to_tick(100);
366 rcv.push(t.tick_to(tick).unwrap());
367 rcv.push(t.tick_to(tick).unwrap());
368
369 assert_eq!(None, t.tick_to(tick));
370
371 rcv.sort();
372 assert!(rcv == ["a", "b"], "actual={:?}", rcv);
373
374 tick = t.ms_to_tick(200);
375 assert_eq!(None, t.tick_to(tick));
376
377 assert_eq!(t.count(), 0);
378 }
379
380 #[test]
381 pub fn test_multiple_timeouts_diff_tick() {
382 let mut t = timer();
383 let mut tick;
384
385 t.timeout_at_ms("a", 110).unwrap();
386 t.timeout_at_ms("b", 220).unwrap();
387 t.timeout_at_ms("c", 230).unwrap();
388 t.timeout_at_ms("d", 440).unwrap();
389
390 tick = t.ms_to_tick(100);
391 assert_eq!(None, t.tick_to(tick));
392
393 tick = t.ms_to_tick(200);
394 assert_eq!(Some("a"), t.tick_to(tick));
395 assert_eq!(None, t.tick_to(tick));
396
397 tick = t.ms_to_tick(300);
398 assert_eq!(Some("c"), t.tick_to(tick));
399 assert_eq!(Some("b"), t.tick_to(tick));
400 assert_eq!(None, t.tick_to(tick));
401
402 tick = t.ms_to_tick(400);
403 assert_eq!(None, t.tick_to(tick));
404
405 tick = t.ms_to_tick(500);
406 assert_eq!(Some("d"), t.tick_to(tick));
407 assert_eq!(None, t.tick_to(tick));
408
409 tick = t.ms_to_tick(600);
410 assert_eq!(None, t.tick_to(tick));
411 }
412
413 #[test]
414 pub fn test_catching_up() {
415 let mut t = timer();
416
417 t.timeout_at_ms("a", 110).unwrap();
418 t.timeout_at_ms("b", 220).unwrap();
419 t.timeout_at_ms("c", 230).unwrap();
420 t.timeout_at_ms("d", 440).unwrap();
421
422 let tick = t.ms_to_tick(600);
423 assert_eq!(Some("a"), t.tick_to(tick));
424 assert_eq!(Some("c"), t.tick_to(tick));
425 assert_eq!(Some("b"), t.tick_to(tick));
426 assert_eq!(Some("d"), t.tick_to(tick));
427 assert_eq!(None, t.tick_to(tick));
428 }
429
430 #[test]
431 pub fn test_timeout_hash_collision() {
432 let mut t = timer();
433 let mut tick;
434
435 t.timeout_at_ms("a", 100).unwrap();
436 t.timeout_at_ms("b", 100 + TICK * SLOTS as u64).unwrap();
437
438 tick = t.ms_to_tick(100);
439 assert_eq!(Some("a"), t.tick_to(tick));
440 assert_eq!(1, t.count());
441
442 tick = t.ms_to_tick(200);
443 assert_eq!(None, t.tick_to(tick));
444 assert_eq!(1, t.count());
445
446 tick = t.ms_to_tick(100 + TICK * SLOTS as u64);
447 assert_eq!(Some("b"), t.tick_to(tick));
448 assert_eq!(0, t.count());
449 }
450
451 #[test]
452 pub fn test_clearing_timeout_between_triggers() {
453 let mut t = timer();
454 let mut tick;
455
456 let a = t.timeout_at_ms("a", 100).unwrap();
457 let _ = t.timeout_at_ms("b", 100).unwrap();
458 let _ = t.timeout_at_ms("c", 200).unwrap();
459
460 tick = t.ms_to_tick(100);
461 assert_eq!(Some("b"), t.tick_to(tick));
462 assert_eq!(2, t.count());
463
464 t.clear(a);
465 assert_eq!(1, t.count());
466
467 assert_eq!(None, t.tick_to(tick));
468
469 tick = t.ms_to_tick(200);
470 assert_eq!(Some("c"), t.tick_to(tick));
471 assert_eq!(0, t.count());
472 }
473
474 const TICK: u64 = 100;
475 const SLOTS: usize = 16;
476
477 fn timer() -> Timer<&'static str> {
478 Timer::new(TICK, SLOTS, 32)
479 }
480}