1#![allow(deprecated, missing_debug_implementations)]
4
5use crate::{convert, Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
6use lazycell::LazyCell;
7use slab::Slab;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use std::{cmp, fmt, io, iter, thread, u64, usize};
12
13pub struct Timer<T> {
26 tick_ms: u64,
28 entries: Slab<Entry<T>>,
30 wheel: Vec<WheelEntry>,
33 start: Instant,
35 tick: Tick,
37 next: Token,
39 mask: u64,
41 inner: LazyCell<Inner>,
43}
44
45pub struct Builder {
47 tick: Duration,
49 num_slots: usize,
51 capacity: usize,
53}
54
55#[derive(Clone, Debug)]
59pub struct Timeout {
60 token: Token,
62 tick: u64,
64}
65
66struct Inner {
67 registration: Registration,
68 set_readiness: SetReadiness,
69 wakeup_state: WakeupState,
70 wakeup_thread: thread::JoinHandle<()>,
71}
72
73impl Drop for Inner {
74 fn drop(&mut self) {
75 self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release);
77 self.wakeup_thread.thread().unpark();
79 }
80}
81
82#[derive(Copy, Clone, Debug)]
83struct WheelEntry {
84 next_tick: Tick,
85 head: Token,
86}
87
88struct Entry<T> {
91 state: T,
92 links: EntryLinks,
93}
94
95#[derive(Copy, Clone)]
96struct EntryLinks {
97 tick: Tick,
98 prev: Token,
99 next: Token,
100}
101
102type Tick = u64;
103
104const TICK_MAX: Tick = u64::MAX;
105
106type WakeupState = Arc<AtomicUsize>;
108
109const TERMINATE_THREAD: usize = 0;
110const EMPTY: Token = Token(usize::MAX);
111
112impl Builder {
113 pub fn tick_duration(mut self, duration: Duration) -> Builder {
115 self.tick = duration;
116 self
117 }
118
119 pub fn num_slots(mut self, num_slots: usize) -> Builder {
121 self.num_slots = num_slots;
122 self
123 }
124
125 pub fn capacity(mut self, capacity: usize) -> Builder {
127 self.capacity = capacity;
128 self
129 }
130
131 pub fn build<T>(self) -> Timer<T> {
133 Timer::new(
134 convert::millis(self.tick),
135 self.num_slots,
136 self.capacity,
137 Instant::now(),
138 )
139 }
140}
141
142impl Default for Builder {
143 fn default() -> Builder {
144 Builder {
145 tick: Duration::from_millis(100),
146 num_slots: 1 << 8,
147 capacity: 1 << 16,
148 }
149 }
150}
151
152impl<T> Timer<T> {
153 fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
154 let num_slots = num_slots.next_power_of_two();
155 let capacity = capacity.next_power_of_two();
156 let mask = (num_slots as u64) - 1;
157 let wheel = iter::repeat(WheelEntry {
158 next_tick: TICK_MAX,
159 head: EMPTY,
160 })
161 .take(num_slots)
162 .collect();
163
164 Timer {
165 tick_ms,
166 entries: Slab::with_capacity(capacity),
167 wheel,
168 start,
169 tick: 0,
170 next: EMPTY,
171 mask,
172 inner: LazyCell::new(),
173 }
174 }
175
176 pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
180 let delay_from_start = self.start.elapsed() + delay_from_now;
181 self.set_timeout_at(delay_from_start, state)
182 }
183
184 fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
185 let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
186 trace!(
187 "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
188 delay_from_start,
189 tick,
190 self.tick
191 );
192
193 if tick <= self.tick {
195 tick = self.tick + 1;
196 }
197
198 self.insert(tick, state)
199 }
200
201 fn insert(&mut self, tick: Tick, state: T) -> Timeout {
202 let slot = (tick & self.mask) as usize;
204 let curr = self.wheel[slot];
205
206 let entry = Entry::new(state, tick, curr.head);
208 let token = Token(self.entries.insert(entry));
209
210 if curr.head != EMPTY {
211 self.entries[curr.head.into()].links.prev = token;
214 }
215
216 self.wheel[slot] = WheelEntry {
218 next_tick: cmp::min(tick, curr.next_tick),
219 head: token,
220 };
221
222 self.schedule_readiness(tick);
223
224 trace!("inserted timout; slot={}; token={:?}", slot, token);
225
226 Timeout { token, tick }
228 }
229
230 pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
235 let links = match self.entries.get(timeout.token.into()) {
236 Some(e) => e.links,
237 None => return None,
238 };
239
240 if links.tick != timeout.tick {
242 return None;
243 }
244
245 self.unlink(&links, timeout.token);
246 Some(self.entries.remove(timeout.token.into()).state)
247 }
248
249 pub fn poll(&mut self) -> Option<T> {
254 let target_tick = current_tick(self.start, self.tick_ms);
255 self.poll_to(target_tick)
256 }
257
258 fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
259 trace!(
260 "tick_to; target_tick={}; current_tick={}",
261 target_tick,
262 self.tick
263 );
264
265 if target_tick < self.tick {
266 target_tick = self.tick;
267 }
268
269 while self.tick <= target_tick {
270 let curr = self.next;
271
272 trace!("ticking; curr={:?}", curr);
273
274 if curr == EMPTY {
275 self.tick += 1;
276
277 let slot = self.slot_for(self.tick);
278 self.next = self.wheel[slot].head;
279
280 if self.next == EMPTY {
286 self.wheel[slot].next_tick = TICK_MAX;
287 }
288 } else {
289 let slot = self.slot_for(self.tick);
290
291 if curr == self.wheel[slot].head {
292 self.wheel[slot].next_tick = TICK_MAX;
293 }
294
295 let links = self.entries[curr.into()].links;
296
297 if links.tick <= self.tick {
298 trace!("triggering; token={:?}", curr);
299
300 self.unlink(&links, curr);
302
303 return Some(self.entries.remove(curr.into()).state);
305 } else {
306 let next_tick = self.wheel[slot].next_tick;
307 self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
308 self.next = links.next;
309 }
310 }
311 }
312
313 if let Some(inner) = self.inner.borrow() {
315 trace!("unsetting readiness");
316 let _ = inner.set_readiness.set_readiness(Ready::empty());
317
318 if let Some(tick) = self.next_tick() {
319 self.schedule_readiness(tick);
320 }
321 }
322
323 None
324 }
325
326 fn unlink(&mut self, links: &EntryLinks, token: Token) {
327 trace!(
328 "unlinking timeout; slot={}; token={:?}",
329 self.slot_for(links.tick),
330 token
331 );
332
333 if links.prev == EMPTY {
334 let slot = self.slot_for(links.tick);
335 self.wheel[slot].head = links.next;
336 } else {
337 self.entries[links.prev.into()].links.next = links.next;
338 }
339
340 if links.next != EMPTY {
341 self.entries[links.next.into()].links.prev = links.prev;
342
343 if token == self.next {
344 self.next = links.next;
345 }
346 } else if token == self.next {
347 self.next = EMPTY;
348 }
349 }
350
351 fn schedule_readiness(&self, tick: Tick) {
352 if let Some(inner) = self.inner.borrow() {
353 let mut curr = inner.wakeup_state.load(Ordering::Acquire);
355
356 loop {
357 if curr as Tick <= tick {
358 return;
360 }
361
362 trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
364 match inner.wakeup_state.compare_exchange_weak(
365 curr,
366 tick as usize,
367 Ordering::Release,
368 Ordering::Relaxed,
369 ) {
370 Ok(_) => {
371 trace!("unparking wakeup thread");
373 inner.wakeup_thread.thread().unpark();
374 return;
375 }
376 Err(actual) => curr = actual,
377 }
378 }
379 }
380 }
381
382 fn next_tick(&self) -> Option<Tick> {
384 if self.next != EMPTY {
385 let slot = self.slot_for(self.entries[self.next.into()].links.tick);
386
387 if self.wheel[slot].next_tick == self.tick {
388 return Some(self.tick);
390 }
391 }
392
393 self.wheel.iter().map(|e| e.next_tick).min()
394 }
395
396 fn slot_for(&self, tick: Tick) -> usize {
397 (self.mask & tick) as usize
398 }
399}
400
401impl<T> Default for Timer<T> {
402 fn default() -> Timer<T> {
403 Builder::default().build()
404 }
405}
406
407impl<T> Evented for Timer<T> {
408 fn register(
409 &self,
410 poll: &Poll,
411 token: Token,
412 interest: Ready,
413 opts: PollOpt,
414 ) -> io::Result<()> {
415 if self.inner.borrow().is_some() {
416 return Err(io::Error::new(
417 io::ErrorKind::Other,
418 "timer already registered",
419 ));
420 }
421
422 let (registration, set_readiness) = Registration::new2();
423 poll.register(®istration, token, interest, opts)?;
424 let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
425 let thread_handle = spawn_wakeup_thread(
426 Arc::clone(&wakeup_state),
427 set_readiness.clone(),
428 self.start,
429 self.tick_ms,
430 );
431
432 self.inner
433 .fill(Inner {
434 registration,
435 set_readiness,
436 wakeup_state,
437 wakeup_thread: thread_handle,
438 })
439 .expect("timer already registered");
440
441 if let Some(next_tick) = self.next_tick() {
442 self.schedule_readiness(next_tick);
443 }
444
445 Ok(())
446 }
447
448 fn reregister(
449 &self,
450 poll: &Poll,
451 token: Token,
452 interest: Ready,
453 opts: PollOpt,
454 ) -> io::Result<()> {
455 match self.inner.borrow() {
456 Some(inner) => poll.reregister(&inner.registration, token, interest, opts),
457 None => Err(io::Error::new(
458 io::ErrorKind::Other,
459 "receiver not registered",
460 )),
461 }
462 }
463
464 fn deregister(&self, poll: &Poll) -> io::Result<()> {
465 match self.inner.borrow() {
466 Some(inner) => poll.deregister(&inner.registration),
467 None => Err(io::Error::new(
468 io::ErrorKind::Other,
469 "receiver not registered",
470 )),
471 }
472 }
473}
474
475impl fmt::Debug for Inner {
476 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
477 fmt.debug_struct("Inner")
478 .field("registration", &self.registration)
479 .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
480 .finish()
481 }
482}
483
484fn spawn_wakeup_thread(
485 state: WakeupState,
486 set_readiness: SetReadiness,
487 start: Instant,
488 tick_ms: u64,
489) -> thread::JoinHandle<()> {
490 thread::Builder::new()
491 .name(format!(
492 "mio_extras::timer : {}",
493 thread::current().name().unwrap_or("no name")
494 ))
495 .spawn(move || {
496 let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
497
498 loop {
499 if sleep_until_tick == TERMINATE_THREAD as Tick {
500 return;
501 }
502
503 let now_tick = current_tick(start, tick_ms);
504
505 trace!(
506 "wakeup thread: sleep_until_tick={:?}; now_tick={:?}",
507 sleep_until_tick,
508 now_tick
509 );
510
511 if now_tick < sleep_until_tick {
512 match tick_ms.checked_mul(sleep_until_tick - now_tick) {
517 Some(sleep_duration) => {
518 trace!(
519 "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
520 tick_ms,
521 now_tick,
522 sleep_until_tick,
523 sleep_duration
524 );
525 thread::park_timeout(Duration::from_millis(sleep_duration));
526 }
527 None => {
528 trace!(
529 "sleeping; tick_ms={}; now_tick={}; blocking sleep",
530 tick_ms,
531 now_tick
532 );
533 thread::park();
534 }
535 }
536 sleep_until_tick = state.load(Ordering::Acquire) as Tick;
537 } else {
538 match state.compare_exchange_weak(
539 sleep_until_tick as usize,
540 usize::MAX,
541 Ordering::AcqRel,
542 Ordering::Acquire,
543 ) {
544 Ok(_) => {
545 trace!("setting readiness from wakeup thread");
546 let _ = set_readiness.set_readiness(Ready::readable());
547 sleep_until_tick = usize::MAX as Tick;
548 }
549 Err(actual) => sleep_until_tick = actual as Tick,
550 }
551 }
552 }
553 })
554 .unwrap()
555}
556
557fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
558 let elapsed_ms = convert::millis(elapsed);
560 elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
561}
562
563fn current_tick(start: Instant, tick_ms: u64) -> Tick {
564 duration_to_tick(start.elapsed(), tick_ms)
565}
566
567impl<T> Entry<T> {
568 fn new(state: T, tick: u64, next: Token) -> Entry<T> {
569 Entry {
570 state,
571 links: EntryLinks {
572 tick,
573 prev: EMPTY,
574 next,
575 },
576 }
577 }
578}
579
580#[cfg(test)]
581mod test {
582 use super::*;
583 use std::time::{Duration, Instant};
584
585 #[test]
586 pub fn test_timeout_next_tick() {
587 let mut t = timer();
588 let mut tick;
589
590 t.set_timeout_at(Duration::from_millis(100), "a");
591
592 tick = ms_to_tick(&t, 50);
593 assert_eq!(None, t.poll_to(tick));
594
595 tick = ms_to_tick(&t, 100);
596 assert_eq!(Some("a"), t.poll_to(tick));
597 assert_eq!(None, t.poll_to(tick));
598
599 tick = ms_to_tick(&t, 150);
600 assert_eq!(None, t.poll_to(tick));
601
602 tick = ms_to_tick(&t, 200);
603 assert_eq!(None, t.poll_to(tick));
604
605 assert_eq!(count(&t), 0);
606 }
607
608 #[test]
609 pub fn test_clearing_timeout() {
610 let mut t = timer();
611 let mut tick;
612
613 let to = t.set_timeout_at(Duration::from_millis(100), "a");
614 assert_eq!("a", t.cancel_timeout(&to).unwrap());
615
616 tick = ms_to_tick(&t, 100);
617 assert_eq!(None, t.poll_to(tick));
618
619 tick = ms_to_tick(&t, 200);
620 assert_eq!(None, t.poll_to(tick));
621
622 assert_eq!(count(&t), 0);
623 }
624
625 #[test]
626 pub fn test_multiple_timeouts_same_tick() {
627 let mut t = timer();
628 let mut tick;
629
630 t.set_timeout_at(Duration::from_millis(100), "a");
631 t.set_timeout_at(Duration::from_millis(100), "b");
632
633 let mut rcv = vec![];
634
635 tick = ms_to_tick(&t, 100);
636 rcv.push(t.poll_to(tick).unwrap());
637 rcv.push(t.poll_to(tick).unwrap());
638
639 assert_eq!(None, t.poll_to(tick));
640
641 rcv.sort_unstable();
642 assert!(rcv == ["a", "b"], "actual={:?}", rcv);
643
644 tick = ms_to_tick(&t, 200);
645 assert_eq!(None, t.poll_to(tick));
646
647 assert_eq!(count(&t), 0);
648 }
649
650 #[test]
651 pub fn test_multiple_timeouts_diff_tick() {
652 let mut t = timer();
653 let mut tick;
654
655 t.set_timeout_at(Duration::from_millis(110), "a");
656 t.set_timeout_at(Duration::from_millis(220), "b");
657 t.set_timeout_at(Duration::from_millis(230), "c");
658 t.set_timeout_at(Duration::from_millis(440), "d");
659 t.set_timeout_at(Duration::from_millis(560), "e");
660
661 tick = ms_to_tick(&t, 100);
662 assert_eq!(Some("a"), t.poll_to(tick));
663 assert_eq!(None, t.poll_to(tick));
664
665 tick = ms_to_tick(&t, 200);
666 assert_eq!(Some("c"), t.poll_to(tick));
667 assert_eq!(Some("b"), t.poll_to(tick));
668 assert_eq!(None, t.poll_to(tick));
669
670 tick = ms_to_tick(&t, 300);
671 assert_eq!(None, t.poll_to(tick));
672
673 tick = ms_to_tick(&t, 400);
674 assert_eq!(Some("d"), t.poll_to(tick));
675 assert_eq!(None, t.poll_to(tick));
676
677 tick = ms_to_tick(&t, 500);
678 assert_eq!(None, t.poll_to(tick));
679
680 tick = ms_to_tick(&t, 600);
681 assert_eq!(Some("e"), t.poll_to(tick));
682 assert_eq!(None, t.poll_to(tick));
683 }
684
685 #[test]
686 pub fn test_catching_up() {
687 let mut t = timer();
688
689 t.set_timeout_at(Duration::from_millis(110), "a");
690 t.set_timeout_at(Duration::from_millis(220), "b");
691 t.set_timeout_at(Duration::from_millis(230), "c");
692 t.set_timeout_at(Duration::from_millis(440), "d");
693
694 let tick = ms_to_tick(&t, 600);
695 assert_eq!(Some("a"), t.poll_to(tick));
696 assert_eq!(Some("c"), t.poll_to(tick));
697 assert_eq!(Some("b"), t.poll_to(tick));
698 assert_eq!(Some("d"), t.poll_to(tick));
699 assert_eq!(None, t.poll_to(tick));
700 }
701
702 #[test]
703 pub fn test_timeout_hash_collision() {
704 let mut t = timer();
705 let mut tick;
706
707 t.set_timeout_at(Duration::from_millis(100), "a");
708 t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
709
710 tick = ms_to_tick(&t, 100);
711 assert_eq!(Some("a"), t.poll_to(tick));
712 assert_eq!(1, count(&t));
713
714 tick = ms_to_tick(&t, 200);
715 assert_eq!(None, t.poll_to(tick));
716 assert_eq!(1, count(&t));
717
718 tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
719 assert_eq!(Some("b"), t.poll_to(tick));
720 assert_eq!(0, count(&t));
721 }
722
723 #[test]
724 pub fn test_clearing_timeout_between_triggers() {
725 let mut t = timer();
726 let mut tick;
727
728 let a = t.set_timeout_at(Duration::from_millis(100), "a");
729 let _ = t.set_timeout_at(Duration::from_millis(100), "b");
730 let _ = t.set_timeout_at(Duration::from_millis(200), "c");
731
732 tick = ms_to_tick(&t, 100);
733 assert_eq!(Some("b"), t.poll_to(tick));
734 assert_eq!(2, count(&t));
735
736 t.cancel_timeout(&a);
737 assert_eq!(1, count(&t));
738
739 assert_eq!(None, t.poll_to(tick));
740
741 tick = ms_to_tick(&t, 200);
742 assert_eq!(Some("c"), t.poll_to(tick));
743 assert_eq!(0, count(&t));
744 }
745
746 const TICK: u64 = 100;
747 const SLOTS: usize = 16;
748 const CAPACITY: usize = 32;
749
750 fn count<T>(timer: &Timer<T>) -> usize {
751 timer.entries.len()
752 }
753
754 fn timer() -> Timer<&'static str> {
755 Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
756 }
757
758 fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
759 ms / timer.tick_ms
760 }
761}