1use std::{
6 cmp,
7 fmt::Display,
8 iter,
9 time::{Duration, Instant},
10};
11
12use mio::Token;
13use slab::Slab;
14
15use crate::server::TIMER;
16
17mod convert {
19 use std::time::Duration;
20
21 pub fn millis(duration: Duration) -> u64 {
27 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
28 }
29}
30
31pub struct Timer<T> {
44 tick_ms: u64,
46 entries: Slab<Entry<T>>,
48 wheel: Vec<WheelEntry>,
51 start: Instant,
53 tick: Tick,
55 next: Token,
57 mask: u64,
59}
60
61pub struct Builder {
63 tick: Duration,
65 num_slots: usize,
67 capacity: usize,
69}
70
71#[derive(Clone, Debug)]
75pub struct Timeout {
76 token: Token,
78 tick: u64,
80}
81
82#[derive(Clone, Debug)]
83pub struct TimeoutContainer {
84 timeout: Option<Timeout>,
86 duration: Duration,
87 token: Option<Token>,
88}
89
90impl TimeoutContainer {
91 pub fn new(duration: Duration, token: Token) -> TimeoutContainer {
92 let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(duration, token));
93 TimeoutContainer {
94 timeout: Some(timeout),
95 duration,
96 token: Some(token),
97 }
98 }
99
100 pub fn new_empty(duration: Duration) -> TimeoutContainer {
101 TimeoutContainer {
102 timeout: None,
103 duration,
104 token: None,
105 }
106 }
107
108 pub fn take(&mut self) -> TimeoutContainer {
109 TimeoutContainer {
110 timeout: self.timeout.take(),
111 duration: self.duration,
112 token: self.token.take(),
113 }
114 }
115
116 pub fn triggered(&mut self) {
118 let _ = self.timeout.take();
119 }
120
121 pub fn set(&mut self, token: Token) {
122 if let Some(timeout) = self.timeout.take() {
123 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
124 }
125
126 let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token));
127
128 self.timeout = Some(timeout);
129 self.token = Some(token);
130 }
131
132 pub fn set_duration(&mut self, duration: Duration) {
134 self.duration = duration;
135
136 if let Some(timeout) = self.timeout.take() {
137 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
138 }
139
140 if let Some(token) = self.token {
141 self.timeout =
142 Some(TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)));
143 }
144 }
145
146 pub fn duration(&self) -> Duration {
147 self.duration
148 }
149
150 pub fn cancel(&mut self) -> bool {
151 match self.timeout.take() {
152 None => {
153 false
156 }
157 Some(timeout) => {
158 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
159 true
160 }
161 }
162 }
163
164 pub fn reset(&mut self) -> bool {
166 match self.timeout.take() {
167 None => {
168 if let Some(token) = self.token {
169 self.timeout = Some(
170 TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)),
171 );
172 } else {
173 return false;
175 }
176 }
177 Some(timeout) => {
178 self.timeout =
179 TIMER.with(|timer| timer.borrow_mut().reset_timeout(&timeout, self.duration));
180 }
181 };
182 self.timeout.is_some()
183 }
184}
185
186impl Display for TimeoutContainer {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 write!(f, "{:?}", self.duration)
189 }
190}
191
192impl std::ops::Drop for TimeoutContainer {
193 fn drop(&mut self) {
194 if self.cancel() {
195 debug!("Cancel a dangling timeout that haven't be handled in session lifecycle, token ({:?}), duration {}", self.token, self);
196 }
197 }
198}
199
200#[derive(Copy, Clone, Debug)]
201struct WheelEntry {
202 next_tick: Tick,
203 head: Token,
204}
205
206struct Entry<T> {
209 state: T,
210 links: EntryLinks,
211}
212
213#[derive(Copy, Clone)]
214struct EntryLinks {
215 tick: Tick,
216 prev: Token,
217 next: Token,
218}
219
220type Tick = u64;
221const TICK_MAX: Tick = u64::MAX;
222const EMPTY: Token = Token(usize::MAX);
223
224impl Builder {
225 pub fn tick_duration(mut self, duration: Duration) -> Builder {
227 self.tick = duration;
228 self
229 }
230
231 pub fn num_slots(mut self, num_slots: usize) -> Builder {
233 self.num_slots = num_slots;
234 self
235 }
236
237 pub fn capacity(mut self, capacity: usize) -> Builder {
239 self.capacity = capacity;
240 self
241 }
242
243 pub fn build<T>(self) -> Timer<T> {
245 Timer::new(
246 convert::millis(self.tick),
247 self.num_slots,
248 self.capacity,
249 Instant::now(),
250 )
251 }
252}
253
254impl Default for Builder {
255 fn default() -> Builder {
256 Builder {
257 tick: Duration::from_millis(100),
258 num_slots: 1 << 8,
259 capacity: 1 << 16,
260 }
261 }
262}
263
264impl<T> Timer<T> {
265 fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
266 let num_slots = num_slots.next_power_of_two();
267 let capacity = capacity.next_power_of_two();
268 let mask = (num_slots as u64) - 1;
269 let wheel = iter::repeat(WheelEntry {
270 next_tick: TICK_MAX,
271 head: EMPTY,
272 })
273 .take(num_slots)
274 .collect();
275
276 Timer {
277 tick_ms,
278 entries: Slab::with_capacity(capacity),
279 wheel,
280 start,
281 tick: 0,
282 next: EMPTY,
283 mask,
284 }
285 }
286
287 pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
291 let delay_from_start = self.start.elapsed() + delay_from_now;
292 self.set_timeout_at(delay_from_start, state)
293 }
294
295 fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
296 let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
297 trace!(
298 "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
299 delay_from_start,
300 tick,
301 self.tick
302 );
303
304 if tick <= self.tick {
306 tick = self.tick + 1;
307 }
308
309 self.insert(tick, state)
310 }
311
312 fn insert(&mut self, tick: Tick, state: T) -> Timeout {
313 let slot = (tick & self.mask) as usize;
315 let curr = self.wheel[slot];
316
317 let entry = Entry::new(state, tick, curr.head);
319 let token = Token(self.entries.insert(entry));
320
321 if curr.head != EMPTY {
322 self.entries[curr.head.into()].links.prev = token;
325 }
326
327 self.wheel[slot] = WheelEntry {
329 next_tick: cmp::min(tick, curr.next_tick),
330 head: token,
331 };
332
333 trace!("inserted timeout; slot={}; token={:?}", slot, token);
334
335 Timeout { token, tick }
337 }
338
339 pub fn reset_timeout(
342 &mut self,
343 timeout: &Timeout,
344 delay_from_now: Duration,
345 ) -> Option<Timeout> {
346 self.cancel_timeout(timeout)
347 .map(|state| self.set_timeout(delay_from_now, state))
348 }
349
350 pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
356 let links = match self.entries.get(timeout.token.into()) {
357 Some(e) => e.links,
358 None => {
359 debug!("timeout token {:?} not found", timeout.token);
360 return None;
361 }
362 };
363
364 if links.tick != timeout.tick {
366 return None;
367 }
368
369 self.unlink(&links, timeout.token);
370 Some(self.entries.remove(timeout.token.into()).state)
371 }
372
373 pub fn poll(&mut self) -> Option<T> {
378 let target_tick = current_tick(self.start, self.tick_ms);
379 self.poll_to(target_tick)
380 }
381
382 fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
383 trace!(
384 "tick_to; target_tick={}; current_tick={}",
385 target_tick,
386 self.tick
387 );
388
389 if target_tick < self.tick {
390 target_tick = self.tick;
391 }
392
393 while self.tick <= target_tick {
394 let curr = self.next;
395
396 if curr == EMPTY {
399 self.tick += 1;
400
401 let slot = self.slot_for(self.tick);
402 self.next = self.wheel[slot].head;
403
404 if self.next == EMPTY {
410 self.wheel[slot].next_tick = TICK_MAX;
411 }
412 } else {
413 let slot = self.slot_for(self.tick);
414
415 if curr == self.wheel[slot].head {
416 self.wheel[slot].next_tick = TICK_MAX;
417 }
418
419 let links = self.entries[curr.into()].links;
420
421 if links.tick <= self.tick {
422 trace!("triggering; token={:?}", curr);
423
424 self.unlink(&links, curr);
426
427 return Some(self.entries.remove(curr.into()).state);
429 } else {
430 let next_tick = self.wheel[slot].next_tick;
431 self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
432 self.next = links.next;
433 }
434 }
435 }
436
437 None
438 }
439
440 fn unlink(&mut self, links: &EntryLinks, token: Token) {
441 trace!(
442 "unlinking timeout; slot={}; token={:?}",
443 self.slot_for(links.tick),
444 token
445 );
446
447 if links.prev == EMPTY {
448 let slot = self.slot_for(links.tick);
449 self.wheel[slot].head = links.next;
450 } else {
451 self.entries[links.prev.into()].links.next = links.next;
452 }
453
454 if links.next != EMPTY {
455 self.entries[links.next.into()].links.prev = links.prev;
456
457 if token == self.next {
458 self.next = links.next;
459 }
460 } else if token == self.next {
461 self.next = EMPTY;
462 }
463 }
464
465 fn next_tick(&self) -> Option<Tick> {
467 if self.next != EMPTY {
468 let slot = self.slot_for(self.entries[self.next.into()].links.tick);
469
470 if self.wheel[slot].next_tick == self.tick {
471 return Some(self.tick);
473 }
474 }
475
476 self.wheel.iter().map(|e| e.next_tick).min()
477 }
478
479 pub fn next_poll_date(&self) -> Option<Instant> {
480 self.next_tick()
481 .map(|tick| self.start + Duration::from_millis(self.tick_ms.saturating_mul(tick)))
482 }
483
484 fn slot_for(&self, tick: Tick) -> usize {
485 (self.mask & tick) as usize
486 }
487}
488
489impl<T> Default for Timer<T> {
490 fn default() -> Timer<T> {
491 Builder::default().build()
492 }
493}
494
495fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
496 let elapsed_ms = convert::millis(elapsed);
498 elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
499}
500
501fn current_tick(start: Instant, tick_ms: u64) -> Tick {
502 duration_to_tick(start.elapsed(), tick_ms)
503}
504
505impl<T> Entry<T> {
506 fn new(state: T, tick: u64, next: Token) -> Entry<T> {
507 Entry {
508 state,
509 links: EntryLinks {
510 tick,
511 prev: EMPTY,
512 next,
513 },
514 }
515 }
516}
517
518#[cfg(test)]
519mod test {
520 use super::*;
521 use std::time::{Duration, Instant};
522
523 #[test]
524 pub fn test_timeout_next_tick() {
525 let mut t = timer();
526
527 t.set_timeout_at(Duration::from_millis(100), "a");
528
529 let mut tick = ms_to_tick(&t, 50);
530 assert_eq!(None, t.poll_to(tick));
531
532 tick = ms_to_tick(&t, 100);
533 assert_eq!(Some("a"), t.poll_to(tick));
534 assert_eq!(None, t.poll_to(tick));
535
536 tick = ms_to_tick(&t, 150);
537 assert_eq!(None, t.poll_to(tick));
538
539 tick = ms_to_tick(&t, 200);
540 assert_eq!(None, t.poll_to(tick));
541
542 assert_eq!(count(&t), 0);
543 }
544
545 #[test]
546 pub fn test_clearing_timeout() {
547 let mut t = timer();
548
549 let to = t.set_timeout_at(Duration::from_millis(100), "a");
550 assert_eq!("a", t.cancel_timeout(&to).unwrap());
551
552 let mut tick = ms_to_tick(&t, 100);
553 assert_eq!(None, t.poll_to(tick));
554
555 tick = ms_to_tick(&t, 200);
556 assert_eq!(None, t.poll_to(tick));
557
558 assert_eq!(count(&t), 0);
559 }
560
561 #[test]
562 pub fn test_multiple_timeouts_same_tick() {
563 let mut t = timer();
564
565 t.set_timeout_at(Duration::from_millis(100), "a");
566 t.set_timeout_at(Duration::from_millis(100), "b");
567
568 let mut rcv = vec![];
569
570 let mut tick = ms_to_tick(&t, 100);
571 rcv.push(t.poll_to(tick).unwrap());
572 rcv.push(t.poll_to(tick).unwrap());
573
574 assert_eq!(None, t.poll_to(tick));
575
576 rcv.sort_unstable();
577 assert!(rcv == ["a", "b"], "actual={rcv:?}");
578
579 tick = ms_to_tick(&t, 200);
580 assert_eq!(None, t.poll_to(tick));
581
582 assert_eq!(count(&t), 0);
583 }
584
585 #[test]
586 pub fn test_multiple_timeouts_diff_tick() {
587 let mut t = timer();
588
589 t.set_timeout_at(Duration::from_millis(110), "a");
590 t.set_timeout_at(Duration::from_millis(220), "b");
591 t.set_timeout_at(Duration::from_millis(230), "c");
592 t.set_timeout_at(Duration::from_millis(440), "d");
593 t.set_timeout_at(Duration::from_millis(560), "e");
594
595 let mut tick = ms_to_tick(&t, 100);
596 assert_eq!(Some("a"), t.poll_to(tick));
597 assert_eq!(None, t.poll_to(tick));
598
599 tick = ms_to_tick(&t, 200);
600 assert_eq!(Some("c"), t.poll_to(tick));
601 assert_eq!(Some("b"), t.poll_to(tick));
602 assert_eq!(None, t.poll_to(tick));
603
604 tick = ms_to_tick(&t, 300);
605 assert_eq!(None, t.poll_to(tick));
606
607 tick = ms_to_tick(&t, 400);
608 assert_eq!(Some("d"), t.poll_to(tick));
609 assert_eq!(None, t.poll_to(tick));
610
611 tick = ms_to_tick(&t, 500);
612 assert_eq!(None, t.poll_to(tick));
613
614 tick = ms_to_tick(&t, 600);
615 assert_eq!(Some("e"), t.poll_to(tick));
616 assert_eq!(None, t.poll_to(tick));
617 }
618
619 #[test]
620 pub fn test_catching_up() {
621 let mut t = timer();
622
623 t.set_timeout_at(Duration::from_millis(110), "a");
624 t.set_timeout_at(Duration::from_millis(220), "b");
625 t.set_timeout_at(Duration::from_millis(230), "c");
626 t.set_timeout_at(Duration::from_millis(440), "d");
627
628 let tick = ms_to_tick(&t, 600);
629 assert_eq!(Some("a"), t.poll_to(tick));
630 assert_eq!(Some("c"), t.poll_to(tick));
631 assert_eq!(Some("b"), t.poll_to(tick));
632 assert_eq!(Some("d"), t.poll_to(tick));
633 assert_eq!(None, t.poll_to(tick));
634 }
635
636 #[test]
637 pub fn test_timeout_hash_collision() {
638 let mut t = timer();
639
640 t.set_timeout_at(Duration::from_millis(100), "a");
641 t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
642
643 let mut tick = ms_to_tick(&t, 100);
644 assert_eq!(Some("a"), t.poll_to(tick));
645 assert_eq!(1, count(&t));
646
647 tick = ms_to_tick(&t, 200);
648 assert_eq!(None, t.poll_to(tick));
649 assert_eq!(1, count(&t));
650
651 tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
652 assert_eq!(Some("b"), t.poll_to(tick));
653 assert_eq!(0, count(&t));
654 }
655
656 #[test]
657 pub fn test_clearing_timeout_between_triggers() {
658 let mut t = timer();
659
660 let a = t.set_timeout_at(Duration::from_millis(100), "a");
661 let _ = t.set_timeout_at(Duration::from_millis(100), "b");
662 let _ = t.set_timeout_at(Duration::from_millis(200), "c");
663
664 let mut tick = ms_to_tick(&t, 100);
665 assert_eq!(Some("b"), t.poll_to(tick));
666 assert_eq!(2, count(&t));
667
668 t.cancel_timeout(&a);
669 assert_eq!(1, count(&t));
670
671 assert_eq!(None, t.poll_to(tick));
672
673 tick = ms_to_tick(&t, 200);
674 assert_eq!(Some("c"), t.poll_to(tick));
675 assert_eq!(0, count(&t));
676 }
677
678 const TICK: u64 = 100;
679 const SLOTS: usize = 16;
680 const CAPACITY: usize = 32;
681
682 fn count<T>(timer: &Timer<T>) -> usize {
683 timer.entries.len()
684 }
685
686 fn timer() -> Timer<&'static str> {
687 Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
688 }
689
690 fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
691 ms / timer.tick_ms
692 }
693}