1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use async_timer::oneshot::{Oneshot, Timer};
use core::task::{Context, Poll};
use futures_util::stream::Stream;
use rand_distr::{Distribution, Exp};
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
pub struct Ticker {
should_restart: bool,
curr_timer: Timer,
distr: Exp<f64>,
mean_ns: u128,
}
impl Ticker {
pub fn new(d: Duration) -> Self {
let mean_ns = d.as_nanos();
let lambda = 1. / d.as_nanos() as f64;
let r = Exp::new(lambda).expect("Make exponential distr");
Self {
should_restart: true,
curr_timer: Timer::new(d),
distr: r,
mean_ns,
}
}
}
impl Future for Ticker {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
let t = &mut this.curr_timer;
if this.should_restart {
let mut rng = rand::thread_rng();
let next_interarrival_ns = this.distr.sample(&mut rng);
if next_interarrival_ns as u128 > this.mean_ns * 10 {
tracing::warn!(
sampled_wait_ns = ?next_interarrival_ns,
mean_ns = ?this.mean_ns,
"long wait"
);
}
t.restart(
Duration::from_nanos(next_interarrival_ns as u64),
cx.waker(),
);
this.should_restart = false;
}
let tp = Pin::new(t);
match tp.poll(cx) {
x @ Poll::Pending => x,
x @ Poll::Ready(_) => {
this.should_restart = true;
x
}
}
}
}
impl Stream for Ticker {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.poll(cx).map(Some)
}
}
#[cfg(test)]
mod test {
#[test]
fn reuse() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut t = super::Ticker::new(std::time::Duration::from_millis(10));
let now = std::time::Instant::now();
for _ in 0usize..5 {
(&mut t).await;
}
println!("elapsed: {:?}", now.elapsed());
});
}
}