1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use futures::{stream::Stream, task::Context};
use futures_timer::Delay;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};
pub struct Ticker {
interval: Duration,
next: Instant,
schedule: Delay,
}
impl Ticker {
pub fn new(interval: Duration) -> Ticker {
Ticker::new_with_next(interval, interval)
}
pub fn new_with_next(interval: Duration, first_in: Duration) -> Ticker {
let first = Instant::now() + first_in;
Ticker {
interval,
next: first,
schedule: Delay::new(first_in),
}
}
pub fn next_tick(&self) -> Instant {
self.next_tick_from(Instant::now())
}
pub fn next_tick_from(&self, now: Instant) -> Instant {
if self.next > now {
return self.next;
}
let raw_next = self.next + self.interval;
if raw_next > now {
return raw_next;
}
if self.interval.as_nanos() == 0 {
return now;
}
let missed_times = 1 + ((now - raw_next).as_nanos() / self.interval.as_nanos()) as u32;
self.next + self.interval * (missed_times + 1)
}
}
impl Stream for Ticker {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let schedule = Pin::new(&mut self.schedule);
match schedule.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
let now = Instant::now();
let next = self.next_tick();
self.next = next;
self.schedule.reset(
next.checked_duration_since(now)
.unwrap_or_else(|| Duration::from_nanos(0)),
);
Poll::Ready(Some(Instant::now()))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(1, None)
}
}