1use convert;
4use mio::{self, Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
5use lazycell::LazyCell;
6use std::{cmp, error, fmt, io, u64, usize, iter, thread};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use self::TimerErrorKind::TimerOverflow;
12
13pub struct Timer<T> {
14 tick_ms: u64,
16 entries: Slab<Entry<T>>,
18 wheel: Vec<WheelEntry>,
21 start: Instant,
23 tick: Tick,
25 next: Token,
27 mask: u64,
29 inner: LazyCell<Inner>,
31}
32
33pub struct Builder {
34 tick: Duration,
36 num_slots: usize,
38 capacity: usize,
40}
41
42#[derive(Clone, Debug)]
43pub struct Timeout {
44 token: Token,
46 tick: u64,
48}
49
50struct Inner {
51 registration: Registration,
52 set_readiness: SetReadiness,
53 wakeup_state: WakeupState,
54 wakeup_thread: thread::JoinHandle<()>,
55}
56
57#[derive(Copy, Clone, Debug)]
58struct WheelEntry {
59 next_tick: Tick,
60 head: Token,
61}
62
63struct Entry<T> {
66 state: T,
67 links: EntryLinks,
68}
69
70#[derive(Copy, Clone)]
71struct EntryLinks {
72 tick: Tick,
73 prev: Token,
74 next: Token
75}
76
77type Tick = u64;
78
79const TICK_MAX: Tick = u64::MAX;
80
81type WakeupState = Arc<AtomicUsize>;
83
84type Slab<T> = ::slab::Slab<T, mio::Token>;
85
86pub type Result<T> = ::std::result::Result<T, TimerError>;
87pub type TimerResult<T> = Result<T>;
89
90
91#[derive(Debug)]
92pub struct TimerError {
93 kind: TimerErrorKind,
94 desc: &'static str,
95}
96
97#[derive(Debug)]
98pub enum TimerErrorKind {
99 TimerOverflow,
100}
101
102pub type OldTimerResult<T> = Result<T>;
104
105const TERMINATE_THREAD: usize = 0;
106const EMPTY: Token = Token(usize::MAX);
107
108impl Builder {
109 pub fn tick_duration(mut self, duration: Duration) -> Builder {
110 self.tick = duration;
111 self
112 }
113
114 pub fn num_slots(mut self, num_slots: usize) -> Builder {
115 self.num_slots = num_slots;
116 self
117 }
118
119 pub fn capacity(mut self, capacity: usize) -> Builder {
120 self.capacity = capacity;
121 self
122 }
123
124 pub fn build<T>(self) -> Timer<T> {
125 Timer::new(convert::millis(self.tick), self.num_slots, self.capacity, Instant::now())
126 }
127}
128
129impl Default for Builder {
130 fn default() -> Builder {
131 Builder {
132 tick: Duration::from_millis(100),
133 num_slots: 256,
134 capacity: 65_536,
135 }
136 }
137}
138
139impl<T> Timer<T> {
140 fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
141 let num_slots = num_slots.next_power_of_two();
142 let capacity = capacity.next_power_of_two();
143 let mask = (num_slots as u64) - 1;
144 let wheel = iter::repeat(WheelEntry { next_tick: TICK_MAX, head: EMPTY })
145 .take(num_slots).collect();
146
147 Timer {
148 tick_ms: tick_ms,
149 entries: Slab::with_capacity(capacity),
150 wheel: wheel,
151 start: start,
152 tick: 0,
153 next: EMPTY,
154 mask: mask,
155 inner: LazyCell::new(),
156 }
157 }
158
159 pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Result<Timeout> {
160 let delay_from_start = self.start.elapsed() + delay_from_now;
161 self.set_timeout_at(delay_from_start, state)
162 }
163
164 fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Result<Timeout> {
165 let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
166 trace!("setting timeout; delay={:?}; tick={:?}; current-tick={:?}", delay_from_start, tick, self.tick);
167
168 if tick <= self.tick {
170 tick = self.tick + 1;
171 }
172
173 self.insert(tick, state)
174 }
175
176 fn insert(&mut self, tick: Tick, state: T) -> Result<Timeout> {
177 let slot = (tick & self.mask) as usize;
179 let curr = self.wheel[slot];
180
181 let token = try!(
183 self.entries.insert(Entry::new(state, tick, curr.head))
184 .map_err(|_| TimerError::overflow()));
185
186 if curr.head != EMPTY {
187 self.entries[curr.head].links.prev = token;
190 }
191
192 self.wheel[slot] = WheelEntry {
194 next_tick: cmp::min(tick, curr.next_tick),
195 head: token,
196 };
197
198 self.schedule_readiness(tick);
199
200 trace!("inserted timout; slot={}; token={:?}", slot, token);
201
202 Ok(Timeout {
204 token: token,
205 tick: tick
206 })
207 }
208
209 pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
210 let links = match self.entries.get(timeout.token) {
211 Some(e) => e.links,
212 None => return None
213 };
214
215 if links.tick != timeout.tick {
217 return None;
218 }
219
220 self.unlink(&links, timeout.token);
221 self.entries.remove(timeout.token).map(|e| e.state)
222 }
223
224 pub fn poll(&mut self) -> Option<T> {
225 let target_tick = current_tick(self.start, self.tick_ms);
226 self.poll_to(target_tick)
227 }
228
229 fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
230 trace!("tick_to; target_tick={}; current_tick={}", target_tick, self.tick);
231
232 if target_tick < self.tick {
233 target_tick = self.tick;
234 }
235
236 while self.tick <= target_tick {
237 let curr = self.next;
238
239 trace!("ticking; curr={:?}", curr);
240
241 if curr == EMPTY {
242 self.tick += 1;
243
244 let slot = self.slot_for(self.tick);
245 self.next = self.wheel[slot].head;
246
247 if self.next == EMPTY {
253 self.wheel[slot].next_tick = TICK_MAX;
254 }
255 } else {
256 let slot = self.slot_for(self.tick);
257
258 if curr == self.wheel[slot].head {
259 self.wheel[slot].next_tick = TICK_MAX;
260 }
261
262 let links = self.entries[curr].links;
263
264 if links.tick <= self.tick {
265 trace!("triggering; token={:?}", curr);
266
267 self.unlink(&links, curr);
269
270 return self.entries.remove(curr)
272 .map(|e| e.state);
273 } else {
274 let next_tick = self.wheel[slot].next_tick;
275 self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
276 self.next = links.next;
277 }
278 }
279 }
280
281 if let Some(inner) = self.inner.borrow() {
283 trace!("unsetting readiness");
284 let _ = inner.set_readiness.set_readiness(Ready::none());
285
286 if let Some(tick) = self.next_tick() {
287 self.schedule_readiness(tick);
288 }
289 }
290
291 None
292 }
293
294 fn unlink(&mut self, links: &EntryLinks, token: Token) {
295 trace!("unlinking timeout; slot={}; token={:?}",
296 self.slot_for(links.tick), token);
297
298 if links.prev == EMPTY {
299 let slot = self.slot_for(links.tick);
300 self.wheel[slot].head = links.next;
301 } else {
302 self.entries[links.prev].links.next = links.next;
303 }
304
305 if links.next != EMPTY {
306 self.entries[links.next].links.prev = links.prev;
307
308 if token == self.next {
309 self.next = links.next;
310 }
311 } else if token == self.next {
312 self.next = EMPTY;
313 }
314 }
315
316 fn schedule_readiness(&self, tick: Tick) {
317 if let Some(inner) = self.inner.borrow() {
318 let mut curr = inner.wakeup_state.load(Ordering::Acquire);
320
321 loop {
322 if curr as Tick <= tick {
323 return;
325 }
326
327 trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
329 let actual = inner.wakeup_state.compare_and_swap(curr, tick as usize, Ordering::Release);
330
331 if actual == curr {
332 trace!("unparking wakeup thread");
335 inner.wakeup_thread.thread().unpark();
336 return;
337 }
338
339 curr = actual;
340 }
341 }
342 }
343
344 fn next_tick(&self) -> Option<Tick> {
346 if self.next != EMPTY {
347 let slot = self.slot_for(self.entries[self.next].links.tick);
348
349 if self.wheel[slot].next_tick == self.tick {
350 return Some(self.tick);
352 }
353 }
354
355 self.wheel.iter().map(|e| e.next_tick).min()
356 }
357
358 fn slot_for(&self, tick: Tick) -> usize {
359 (self.mask & tick) as usize
360 }
361}
362
363impl<T> Default for Timer<T> {
364 fn default() -> Timer<T> {
365 Builder::default().build()
366 }
367}
368
369impl<T> Evented for Timer<T> {
370 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
371 if self.inner.borrow().is_some() {
372 return Err(io::Error::new(io::ErrorKind::Other, "timer already registered"));
373 }
374
375 let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
376 let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
377 let thread_handle = spawn_wakeup_thread(
378 wakeup_state.clone(),
379 set_readiness.clone(),
380 self.start, self.tick_ms);
381
382 self.inner.fill(Inner {
383 registration: registration,
384 set_readiness: set_readiness,
385 wakeup_state: wakeup_state,
386 wakeup_thread: thread_handle,
387 }).ok().expect("timer already registered");
388
389 if let Some(next_tick) = self.next_tick() {
390 self.schedule_readiness(next_tick);
391 }
392
393 Ok(())
394 }
395
396 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
397 match self.inner.borrow() {
398 Some(inner) => inner.registration.update(poll, token, interest, opts),
399 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
400 }
401 }
402
403 fn deregister(&self, poll: &Poll) -> io::Result<()> {
404 match self.inner.borrow() {
405 Some(inner) => inner.registration.deregister(poll),
406 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
407 }
408 }
409}
410
411impl fmt::Debug for Inner {
412 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
413 fmt.debug_struct("Inner")
414 .field("registration", &self.registration)
415 .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
416 .finish()
417 }
418}
419
420fn spawn_wakeup_thread(state: WakeupState, set_readiness: SetReadiness, start: Instant, tick_ms: u64) -> thread::JoinHandle<()> {
421 thread::spawn(move || {
422 let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
423
424 loop {
425 if sleep_until_tick == TERMINATE_THREAD as Tick {
426 return;
427 }
428
429 let now_tick = current_tick(start, tick_ms);
430
431 trace!("wakeup thread: sleep_until_tick={:?}; now_tick={:?}", sleep_until_tick, now_tick);
432
433 if now_tick < sleep_until_tick {
434 match tick_ms.checked_mul(sleep_until_tick - now_tick) {
439 Some(sleep_duration) => {
440 trace!("sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
441 tick_ms, now_tick, sleep_until_tick, sleep_duration);
442 thread::park_timeout(Duration::from_millis(sleep_duration));
443 }
444 None => {
445 trace!("sleeping; tick_ms={}; now_tick={}; blocking sleep",
446 tick_ms, now_tick);
447 thread::park();
448 }
449 }
450 sleep_until_tick = state.load(Ordering::Acquire) as Tick;
451 } else {
452 let actual = state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel) as Tick;
453
454 if actual == sleep_until_tick {
455 trace!("setting readiness from wakeup thread");
456 let _ = set_readiness.set_readiness(Ready::readable());
457 sleep_until_tick = usize::MAX as Tick;
458 } else {
459 sleep_until_tick = actual as Tick;
460 }
461 }
462 }
463 })
464}
465
466fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
467 let elapsed_ms = convert::millis(elapsed);
469 elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
470}
471
472fn current_tick(start: Instant, tick_ms: u64) -> Tick {
473 duration_to_tick(start.elapsed(), tick_ms)
474}
475
476impl<T> Entry<T> {
477 fn new(state: T, tick: u64, next: Token) -> Entry<T> {
478 Entry {
479 state: state,
480 links: EntryLinks {
481 tick: tick,
482 prev: EMPTY,
483 next: next,
484 },
485 }
486 }
487}
488
489impl fmt::Display for TimerError {
490 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
491 write!(fmt, "{}: {}", self.kind, self.desc)
492 }
493}
494
495impl TimerError {
496 fn overflow() -> TimerError {
497 TimerError {
498 kind: TimerOverflow,
499 desc: "too many timer entries"
500 }
501 }
502}
503
504impl error::Error for TimerError {
505 fn description(&self) -> &str {
506 self.desc
507 }
508}
509
510impl fmt::Display for TimerErrorKind {
511 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
512 match *self {
513 TimerOverflow => write!(fmt, "TimerOverflow"),
514 }
515 }
516}
517
518#[cfg(test)]
519mod test {
520 use super::*;
521 use std::time::{Duration, Instant};
522
523 #[test]
524 pub fn test_timeout_next_tick() {
525 let mut t = timer();
526 let mut tick;
527
528 t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
529
530 tick = ms_to_tick(&t, 50);
531 assert_eq!(None, t.poll_to(tick));
532
533 tick = ms_to_tick(&t, 100);
534 assert_eq!(Some("a"), t.poll_to(tick));
535 assert_eq!(None, t.poll_to(tick));
536
537 tick = ms_to_tick(&t, 150);
538 assert_eq!(None, t.poll_to(tick));
539
540 tick = ms_to_tick(&t, 200);
541 assert_eq!(None, t.poll_to(tick));
542
543 assert_eq!(count(&t), 0);
544 }
545
546 #[test]
547 pub fn test_clearing_timeout() {
548 let mut t = timer();
549 let mut tick;
550
551 let to = t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
552 assert_eq!("a", t.cancel_timeout(&to).unwrap());
553
554 tick = ms_to_tick(&t, 100);
555 assert_eq!(None, t.poll_to(tick));
556
557 tick = ms_to_tick(&t, 200);
558 assert_eq!(None, t.poll_to(tick));
559
560 assert_eq!(count(&t), 0);
561 }
562
563 #[test]
564 pub fn test_multiple_timeouts_same_tick() {
565 let mut t = timer();
566 let mut tick;
567
568 t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
569 t.set_timeout_at(Duration::from_millis(100), "b").unwrap();
570
571 let mut rcv = vec![];
572
573 tick = ms_to_tick(&t, 100);
574 rcv.push(t.poll_to(tick).unwrap());
575 rcv.push(t.poll_to(tick).unwrap());
576
577 assert_eq!(None, t.poll_to(tick));
578
579 rcv.sort();
580 assert!(rcv == ["a", "b"], "actual={:?}", rcv);
581
582 tick = ms_to_tick(&t, 200);
583 assert_eq!(None, t.poll_to(tick));
584
585 assert_eq!(count(&t), 0);
586 }
587
588 #[test]
589 pub fn test_multiple_timeouts_diff_tick() {
590 let mut t = timer();
591 let mut tick;
592
593 t.set_timeout_at(Duration::from_millis(110), "a").unwrap();
594 t.set_timeout_at(Duration::from_millis(220), "b").unwrap();
595 t.set_timeout_at(Duration::from_millis(230), "c").unwrap();
596 t.set_timeout_at(Duration::from_millis(440), "d").unwrap();
597 t.set_timeout_at(Duration::from_millis(560), "e").unwrap();
598
599 tick = ms_to_tick(&t, 100);
600 assert_eq!(Some("a"), t.poll_to(tick));
601 assert_eq!(None, t.poll_to(tick));
602
603 tick = ms_to_tick(&t, 200);
604 assert_eq!(Some("c"), t.poll_to(tick));
605 assert_eq!(Some("b"), t.poll_to(tick));
606 assert_eq!(None, t.poll_to(tick));
607
608 tick = ms_to_tick(&t, 300);
609 assert_eq!(None, t.poll_to(tick));
610
611 tick = ms_to_tick(&t, 400);
612 assert_eq!(Some("d"), t.poll_to(tick));
613 assert_eq!(None, t.poll_to(tick));
614
615 tick = ms_to_tick(&t, 500);
616 assert_eq!(None, t.poll_to(tick));
617
618 tick = ms_to_tick(&t, 600);
619 assert_eq!(Some("e"), t.poll_to(tick));
620 assert_eq!(None, t.poll_to(tick));
621 }
622
623 #[test]
624 pub fn test_catching_up() {
625 let mut t = timer();
626
627 t.set_timeout_at(Duration::from_millis(110), "a").unwrap();
628 t.set_timeout_at(Duration::from_millis(220), "b").unwrap();
629 t.set_timeout_at(Duration::from_millis(230), "c").unwrap();
630 t.set_timeout_at(Duration::from_millis(440), "d").unwrap();
631
632 let tick = ms_to_tick(&t, 600);
633 assert_eq!(Some("a"), t.poll_to(tick));
634 assert_eq!(Some("c"), t.poll_to(tick));
635 assert_eq!(Some("b"), t.poll_to(tick));
636 assert_eq!(Some("d"), t.poll_to(tick));
637 assert_eq!(None, t.poll_to(tick));
638 }
639
640 #[test]
641 pub fn test_timeout_hash_collision() {
642 let mut t = timer();
643 let mut tick;
644
645 t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
646 t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b").unwrap();
647
648 tick = ms_to_tick(&t, 100);
649 assert_eq!(Some("a"), t.poll_to(tick));
650 assert_eq!(1, count(&t));
651
652 tick = ms_to_tick(&t, 200);
653 assert_eq!(None, t.poll_to(tick));
654 assert_eq!(1, count(&t));
655
656 tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
657 assert_eq!(Some("b"), t.poll_to(tick));
658 assert_eq!(0, count(&t));
659 }
660
661 #[test]
662 pub fn test_clearing_timeout_between_triggers() {
663 let mut t = timer();
664 let mut tick;
665
666 let a = t.set_timeout_at(Duration::from_millis(100), "a").unwrap();
667 let _ = t.set_timeout_at(Duration::from_millis(100), "b").unwrap();
668 let _ = t.set_timeout_at(Duration::from_millis(200), "c").unwrap();
669
670 tick = ms_to_tick(&t, 100);
671 assert_eq!(Some("b"), t.poll_to(tick));
672 assert_eq!(2, count(&t));
673
674 t.cancel_timeout(&a);
675 assert_eq!(1, count(&t));
676
677 assert_eq!(None, t.poll_to(tick));
678
679 tick = ms_to_tick(&t, 200);
680 assert_eq!(Some("c"), t.poll_to(tick));
681 assert_eq!(0, count(&t));
682 }
683
684 const TICK: u64 = 100;
685 const SLOTS: usize = 16;
686 const CAPACITY: usize = 32;
687
688 fn count<T>(timer: &Timer<T>) -> usize {
689 timer.entries.len()
690 }
691
692 fn timer() -> Timer<&'static str> {
693 Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
694 }
695
696 fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
697 ms / timer.tick_ms
698 }
699
700}