1use async_timer::oneshot::{Oneshot, Timer};
17use core::task::{Context, Poll};
18use futures_util::stream::Stream;
19use rand_distr::{Distribution, Exp};
20use std::future::Future;
21use std::pin::Pin;
22use std::time::Duration;
23
24pub struct Ticker {
25 should_restart: bool,
26 curr_timer: Timer,
27 distr: Exp<f64>,
28 mean_ns: u128,
29}
30
31impl Ticker {
32 pub fn new(d: Duration) -> Self {
33 let mean_ns = d.as_nanos();
34 let lambda = 1. / d.as_nanos() as f64;
35 let r = Exp::new(lambda).expect("Make exponential distr");
36 Self {
37 should_restart: true,
38 curr_timer: Timer::new(d),
39 distr: r,
40 mean_ns,
41 }
42 }
43}
44
45impl Future for Ticker {
46 type Output = ();
47
48 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
49 let this = &mut *self;
50 let t = &mut this.curr_timer;
51 if this.should_restart {
52 let mut rng = rand::thread_rng();
53 let next_interarrival_ns = this.distr.sample(&mut rng);
54 if next_interarrival_ns as u128 > this.mean_ns * 10 {
55 tracing::warn!(
56 sampled_wait_ns = ?next_interarrival_ns,
57 mean_ns = ?this.mean_ns,
58 "long wait"
59 );
60 }
61
62 t.restart(
63 Duration::from_nanos(next_interarrival_ns as u64),
64 cx.waker(),
65 );
66
67 this.should_restart = false;
68 }
69
70 let tp = Pin::new(t);
71 match tp.poll(cx) {
72 x @ Poll::Pending => x,
73 x @ Poll::Ready(_) => {
74 this.should_restart = true;
75 x
76 }
77 }
78 }
79}
80
81impl Stream for Ticker {
82 type Item = ();
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
85 self.poll(cx).map(Some)
86 }
87}
88
89#[cfg(test)]
90mod test {
91 #[test]
92 fn reuse() {
93 let mut rt = tokio::runtime::Runtime::new().unwrap();
94 rt.block_on(async move {
95 let mut t = super::Ticker::new(std::time::Duration::from_millis(10));
96 let now = std::time::Instant::now();
97 for _ in 0usize..5 {
98 (&mut t).await;
99 }
100 println!("elapsed: {:?}", now.elapsed());
101 });
102 }
103}