1use std::{
6 cmp,
7 fmt::Display,
8 iter,
9 time::{Duration, Instant},
10};
11
12use mio::Token;
13use slab::Slab;
14
15use crate::server::TIMER;
16
17mod convert {
19 use std::time::Duration;
20
21 pub fn millis(duration: Duration) -> u64 {
27 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
28 }
29}
30
31pub struct Timer<T> {
44 tick_ms: u64,
46 entries: Slab<Entry<T>>,
48 wheel: Vec<WheelEntry>,
51 start: Instant,
53 tick: Tick,
55 next: Token,
57 mask: u64,
59}
60
61pub struct Builder {
63 tick: Duration,
65 num_slots: usize,
67 capacity: usize,
69}
70
71#[derive(Clone, Debug)]
75pub struct Timeout {
76 token: Token,
78 tick: u64,
80}
81
82#[derive(Clone, Debug)]
83pub struct TimeoutContainer {
84 timeout: Option<Timeout>,
86 duration: Duration,
87 token: Option<Token>,
88}
89
90impl TimeoutContainer {
91 pub fn new(duration: Duration, token: Token) -> TimeoutContainer {
92 let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(duration, token));
93 TimeoutContainer {
94 timeout: Some(timeout),
95 duration,
96 token: Some(token),
97 }
98 }
99
100 pub fn new_empty(duration: Duration) -> TimeoutContainer {
101 TimeoutContainer {
102 timeout: None,
103 duration,
104 token: None,
105 }
106 }
107
108 pub fn take(&mut self) -> TimeoutContainer {
109 TimeoutContainer {
110 timeout: self.timeout.take(),
111 duration: self.duration,
112 token: self.token.take(),
113 }
114 }
115
116 pub fn triggered(&mut self) {
118 let _ = self.timeout.take();
119 }
120
121 pub fn set(&mut self, token: Token) {
122 if let Some(timeout) = self.timeout.take() {
123 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
124 }
125
126 let timeout = TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token));
127
128 self.timeout = Some(timeout);
129 self.token = Some(token);
130 }
131
132 pub fn set_duration(&mut self, duration: Duration) {
134 self.duration = duration;
135
136 if let Some(timeout) = self.timeout.take() {
137 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
138 }
139
140 if let Some(token) = self.token {
141 self.timeout =
142 Some(TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)));
143 }
144 }
145
146 pub fn duration(&self) -> Duration {
147 self.duration
148 }
149
150 pub fn cancel(&mut self) -> bool {
151 match self.timeout.take() {
152 None => {
153 false
156 }
157 Some(timeout) => {
158 TIMER.with(|timer| timer.borrow_mut().cancel_timeout(&timeout));
159 true
160 }
161 }
162 }
163
164 pub fn reset(&mut self) -> bool {
166 match self.timeout.take() {
167 None => {
168 if let Some(token) = self.token {
169 self.timeout = Some(
170 TIMER.with(|timer| timer.borrow_mut().set_timeout(self.duration, token)),
171 );
172 } else {
173 return false;
175 }
176 }
177 Some(timeout) => {
178 self.timeout =
179 TIMER.with(|timer| timer.borrow_mut().reset_timeout(&timeout, self.duration));
180 }
181 };
182 self.timeout.is_some()
183 }
184}
185
186impl Display for TimeoutContainer {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 write!(f, "{:?}", self.duration)
189 }
190}
191
192impl std::ops::Drop for TimeoutContainer {
193 fn drop(&mut self) {
194 if self.cancel() {
195 debug!(
196 "Cancel a dangling timeout that haven't be handled in session lifecycle, token ({:?}), duration {}",
197 self.token, self
198 );
199 }
200 }
201}
202
203#[derive(Copy, Clone, Debug)]
204struct WheelEntry {
205 next_tick: Tick,
206 head: Token,
207}
208
209struct Entry<T> {
212 state: T,
213 links: EntryLinks,
214}
215
216#[derive(Copy, Clone)]
217struct EntryLinks {
218 tick: Tick,
219 prev: Token,
220 next: Token,
221}
222
223type Tick = u64;
224const TICK_MAX: Tick = u64::MAX;
225const EMPTY: Token = Token(usize::MAX);
226
227impl Builder {
228 pub fn tick_duration(mut self, duration: Duration) -> Builder {
230 self.tick = duration;
231 self
232 }
233
234 pub fn num_slots(mut self, num_slots: usize) -> Builder {
236 self.num_slots = num_slots;
237 self
238 }
239
240 pub fn capacity(mut self, capacity: usize) -> Builder {
242 self.capacity = capacity;
243 self
244 }
245
246 pub fn build<T>(self) -> Timer<T> {
248 Timer::new(
249 convert::millis(self.tick),
250 self.num_slots,
251 self.capacity,
252 Instant::now(),
253 )
254 }
255}
256
257impl Default for Builder {
258 fn default() -> Builder {
259 Builder {
260 tick: Duration::from_millis(100),
261 num_slots: 1 << 8,
262 capacity: 1 << 16,
263 }
264 }
265}
266
267impl<T> Timer<T> {
268 fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
269 let num_slots = num_slots.next_power_of_two();
270 let capacity = capacity.next_power_of_two();
271 let mask = (num_slots as u64) - 1;
272 let wheel = iter::repeat(WheelEntry {
273 next_tick: TICK_MAX,
274 head: EMPTY,
275 })
276 .take(num_slots)
277 .collect();
278
279 Timer {
280 tick_ms,
281 entries: Slab::with_capacity(capacity),
282 wheel,
283 start,
284 tick: 0,
285 next: EMPTY,
286 mask,
287 }
288 }
289
290 pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
294 let delay_from_start = self.start.elapsed() + delay_from_now;
295 self.set_timeout_at(delay_from_start, state)
296 }
297
298 fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
299 let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
300 trace!(
301 "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
302 delay_from_start, tick, self.tick
303 );
304
305 if tick <= self.tick {
307 tick = self.tick + 1;
308 }
309
310 self.insert(tick, state)
311 }
312
313 fn insert(&mut self, tick: Tick, state: T) -> Timeout {
314 let slot = (tick & self.mask) as usize;
316 let curr = self.wheel[slot];
317
318 let entry = Entry::new(state, tick, curr.head);
320 let token = Token(self.entries.insert(entry));
321
322 if curr.head != EMPTY {
323 self.entries[curr.head.into()].links.prev = token;
326 }
327
328 self.wheel[slot] = WheelEntry {
330 next_tick: cmp::min(tick, curr.next_tick),
331 head: token,
332 };
333
334 trace!("inserted timeout; slot={}; token={:?}", slot, token);
335
336 Timeout { token, tick }
338 }
339
340 pub fn reset_timeout(
343 &mut self,
344 timeout: &Timeout,
345 delay_from_now: Duration,
346 ) -> Option<Timeout> {
347 self.cancel_timeout(timeout)
348 .map(|state| self.set_timeout(delay_from_now, state))
349 }
350
351 pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
357 let links = match self.entries.get(timeout.token.into()) {
358 Some(e) => e.links,
359 None => {
360 debug!("timeout token {:?} not found", timeout.token);
361 return None;
362 }
363 };
364
365 if links.tick != timeout.tick {
367 return None;
368 }
369
370 self.unlink(&links, timeout.token);
371 Some(self.entries.remove(timeout.token.into()).state)
372 }
373
374 pub fn poll(&mut self) -> Option<T> {
379 let target_tick = current_tick(self.start, self.tick_ms);
380 self.poll_to(target_tick)
381 }
382
383 fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
384 trace!(
385 "tick_to; target_tick={}; current_tick={}",
386 target_tick, self.tick
387 );
388
389 if target_tick < self.tick {
390 target_tick = self.tick;
391 }
392
393 while self.tick <= target_tick {
394 let curr = self.next;
395
396 if curr == EMPTY {
399 self.tick += 1;
400
401 let slot = self.slot_for(self.tick);
402 self.next = self.wheel[slot].head;
403
404 if self.next == EMPTY {
410 self.wheel[slot].next_tick = TICK_MAX;
411 }
412 } else {
413 let slot = self.slot_for(self.tick);
414
415 if curr == self.wheel[slot].head {
416 self.wheel[slot].next_tick = TICK_MAX;
417 }
418
419 let links = self.entries[curr.into()].links;
420
421 if links.tick <= self.tick {
422 trace!("triggering; token={:?}", curr);
423
424 self.unlink(&links, curr);
426
427 return Some(self.entries.remove(curr.into()).state);
429 } else {
430 let next_tick = self.wheel[slot].next_tick;
431 self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
432 self.next = links.next;
433 }
434 }
435 }
436
437 None
438 }
439
440 fn unlink(&mut self, links: &EntryLinks, token: Token) {
441 trace!(
442 "unlinking timeout; slot={}; token={:?}",
443 self.slot_for(links.tick),
444 token
445 );
446
447 if links.prev == EMPTY {
448 let slot = self.slot_for(links.tick);
449 self.wheel[slot].head = links.next;
450 } else {
451 self.entries[links.prev.into()].links.next = links.next;
452 }
453
454 if links.next != EMPTY {
455 self.entries[links.next.into()].links.prev = links.prev;
456
457 if token == self.next {
458 self.next = links.next;
459 }
460 } else if token == self.next {
461 self.next = EMPTY;
462 }
463 }
464
465 fn next_tick(&self) -> Option<Tick> {
467 if self.next != EMPTY {
468 let slot = self.slot_for(self.entries[self.next.into()].links.tick);
469
470 if self.wheel[slot].next_tick == self.tick {
471 return Some(self.tick);
473 }
474 }
475
476 self.wheel.iter().map(|e| e.next_tick).min()
477 }
478
479 pub fn next_poll_date(&self) -> Option<Instant> {
480 self.next_tick()
481 .map(|tick| self.start + Duration::from_millis(self.tick_ms.saturating_mul(tick)))
482 }
483
484 fn slot_for(&self, tick: Tick) -> usize {
485 (self.mask & tick) as usize
486 }
487}
488
489impl<T> Default for Timer<T> {
490 fn default() -> Timer<T> {
491 Builder::default().build()
492 }
493}
494
495fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
496 let elapsed_ms = convert::millis(elapsed);
498 elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
499}
500
501fn current_tick(start: Instant, tick_ms: u64) -> Tick {
502 duration_to_tick(start.elapsed(), tick_ms)
503}
504
505impl<T> Entry<T> {
506 fn new(state: T, tick: u64, next: Token) -> Entry<T> {
507 Entry {
508 state,
509 links: EntryLinks {
510 tick,
511 prev: EMPTY,
512 next,
513 },
514 }
515 }
516}
517
518#[cfg(test)]
519mod test {
520 use std::time::{Duration, Instant};
521
522 use super::*;
523
524 #[test]
525 pub fn test_timeout_next_tick() {
526 let mut t = timer();
527
528 t.set_timeout_at(Duration::from_millis(100), "a");
529
530 let mut tick = ms_to_tick(&t, 50);
531 assert_eq!(None, t.poll_to(tick));
532
533 tick = ms_to_tick(&t, 100);
534 assert_eq!(Some("a"), t.poll_to(tick));
535 assert_eq!(None, t.poll_to(tick));
536
537 tick = ms_to_tick(&t, 150);
538 assert_eq!(None, t.poll_to(tick));
539
540 tick = ms_to_tick(&t, 200);
541 assert_eq!(None, t.poll_to(tick));
542
543 assert_eq!(count(&t), 0);
544 }
545
546 #[test]
547 pub fn test_clearing_timeout() {
548 let mut t = timer();
549
550 let to = t.set_timeout_at(Duration::from_millis(100), "a");
551 assert_eq!("a", t.cancel_timeout(&to).unwrap());
552
553 let mut tick = ms_to_tick(&t, 100);
554 assert_eq!(None, t.poll_to(tick));
555
556 tick = ms_to_tick(&t, 200);
557 assert_eq!(None, t.poll_to(tick));
558
559 assert_eq!(count(&t), 0);
560 }
561
562 #[test]
563 pub fn test_multiple_timeouts_same_tick() {
564 let mut t = timer();
565
566 t.set_timeout_at(Duration::from_millis(100), "a");
567 t.set_timeout_at(Duration::from_millis(100), "b");
568
569 let mut rcv = vec![];
570
571 let mut tick = ms_to_tick(&t, 100);
572 rcv.push(t.poll_to(tick).unwrap());
573 rcv.push(t.poll_to(tick).unwrap());
574
575 assert_eq!(None, t.poll_to(tick));
576
577 rcv.sort_unstable();
578 assert!(rcv == ["a", "b"], "actual={rcv:?}");
579
580 tick = ms_to_tick(&t, 200);
581 assert_eq!(None, t.poll_to(tick));
582
583 assert_eq!(count(&t), 0);
584 }
585
586 #[test]
587 pub fn test_multiple_timeouts_diff_tick() {
588 let mut t = timer();
589
590 t.set_timeout_at(Duration::from_millis(110), "a");
591 t.set_timeout_at(Duration::from_millis(220), "b");
592 t.set_timeout_at(Duration::from_millis(230), "c");
593 t.set_timeout_at(Duration::from_millis(440), "d");
594 t.set_timeout_at(Duration::from_millis(560), "e");
595
596 let mut tick = ms_to_tick(&t, 100);
597 assert_eq!(Some("a"), t.poll_to(tick));
598 assert_eq!(None, t.poll_to(tick));
599
600 tick = ms_to_tick(&t, 200);
601 assert_eq!(Some("c"), t.poll_to(tick));
602 assert_eq!(Some("b"), t.poll_to(tick));
603 assert_eq!(None, t.poll_to(tick));
604
605 tick = ms_to_tick(&t, 300);
606 assert_eq!(None, t.poll_to(tick));
607
608 tick = ms_to_tick(&t, 400);
609 assert_eq!(Some("d"), t.poll_to(tick));
610 assert_eq!(None, t.poll_to(tick));
611
612 tick = ms_to_tick(&t, 500);
613 assert_eq!(None, t.poll_to(tick));
614
615 tick = ms_to_tick(&t, 600);
616 assert_eq!(Some("e"), t.poll_to(tick));
617 assert_eq!(None, t.poll_to(tick));
618 }
619
620 #[test]
621 pub fn test_catching_up() {
622 let mut t = timer();
623
624 t.set_timeout_at(Duration::from_millis(110), "a");
625 t.set_timeout_at(Duration::from_millis(220), "b");
626 t.set_timeout_at(Duration::from_millis(230), "c");
627 t.set_timeout_at(Duration::from_millis(440), "d");
628
629 let tick = ms_to_tick(&t, 600);
630 assert_eq!(Some("a"), t.poll_to(tick));
631 assert_eq!(Some("c"), t.poll_to(tick));
632 assert_eq!(Some("b"), t.poll_to(tick));
633 assert_eq!(Some("d"), t.poll_to(tick));
634 assert_eq!(None, t.poll_to(tick));
635 }
636
637 #[test]
638 pub fn test_timeout_hash_collision() {
639 let mut t = timer();
640
641 t.set_timeout_at(Duration::from_millis(100), "a");
642 t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
643
644 let mut tick = ms_to_tick(&t, 100);
645 assert_eq!(Some("a"), t.poll_to(tick));
646 assert_eq!(1, count(&t));
647
648 tick = ms_to_tick(&t, 200);
649 assert_eq!(None, t.poll_to(tick));
650 assert_eq!(1, count(&t));
651
652 tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
653 assert_eq!(Some("b"), t.poll_to(tick));
654 assert_eq!(0, count(&t));
655 }
656
657 #[test]
658 pub fn test_clearing_timeout_between_triggers() {
659 let mut t = timer();
660
661 let a = t.set_timeout_at(Duration::from_millis(100), "a");
662 let _ = t.set_timeout_at(Duration::from_millis(100), "b");
663 let _ = t.set_timeout_at(Duration::from_millis(200), "c");
664
665 let mut tick = ms_to_tick(&t, 100);
666 assert_eq!(Some("b"), t.poll_to(tick));
667 assert_eq!(2, count(&t));
668
669 t.cancel_timeout(&a);
670 assert_eq!(1, count(&t));
671
672 assert_eq!(None, t.poll_to(tick));
673
674 tick = ms_to_tick(&t, 200);
675 assert_eq!(Some("c"), t.poll_to(tick));
676 assert_eq!(0, count(&t));
677 }
678
679 const TICK: u64 = 100;
680 const SLOTS: usize = 16;
681 const CAPACITY: usize = 32;
682
683 fn count<T>(timer: &Timer<T>) -> usize {
684 timer.entries.len()
685 }
686
687 fn timer() -> Timer<&'static str> {
688 Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
689 }
690
691 fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
692 ms / timer.tick_ms
693 }
694}