poisson_ticker/
lib.rs

1//! Exponentially distributed timer for your Poisson-arrivals needs.
2//!
3//! # Example
4//! ```rust
5//! #[tokio::main]
6//! async fn main() {
7//!     let mut t = poisson_ticker::Ticker::new(std::time::Duration::from_millis(10));
8//!     let now = std::time::Instant::now();
9//!     for _ in 0usize..5 {
10//!         (&mut t).await;
11//!     }
12//!     println!("elapsed: {:?}", now.elapsed());
13//! }
14//! ```
15
16use async_timer::oneshot::{Oneshot, Timer};
17use core::task::{Context, Poll};
18use futures_util::stream::Stream;
19use rand_distr::{Distribution, Exp};
20use std::future::Future;
21use std::pin::Pin;
22use std::time::Duration;
23
24pub struct Ticker {
25    should_restart: bool,
26    curr_timer: Timer,
27    distr: Exp<f64>,
28    mean_ns: u128,
29}
30
31impl Ticker {
32    pub fn new(d: Duration) -> Self {
33        let mean_ns = d.as_nanos();
34        let lambda = 1. / d.as_nanos() as f64;
35        let r = Exp::new(lambda).expect("Make exponential distr");
36        Self {
37            should_restart: true,
38            curr_timer: Timer::new(d),
39            distr: r,
40            mean_ns,
41        }
42    }
43}
44
45impl Future for Ticker {
46    type Output = ();
47
48    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
49        let this = &mut *self;
50        let t = &mut this.curr_timer;
51        if this.should_restart {
52            let mut rng = rand::thread_rng();
53            let next_interarrival_ns = this.distr.sample(&mut rng);
54            if next_interarrival_ns as u128 > this.mean_ns * 10 {
55                tracing::warn!(
56                    sampled_wait_ns = ?next_interarrival_ns,
57                    mean_ns = ?this.mean_ns,
58                    "long wait"
59                );
60            }
61
62            t.restart(
63                Duration::from_nanos(next_interarrival_ns as u64),
64                cx.waker(),
65            );
66
67            this.should_restart = false;
68        }
69
70        let tp = Pin::new(t);
71        match tp.poll(cx) {
72            x @ Poll::Pending => x,
73            x @ Poll::Ready(_) => {
74                this.should_restart = true;
75                x
76            }
77        }
78    }
79}
80
81impl Stream for Ticker {
82    type Item = ();
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
85        self.poll(cx).map(Some)
86    }
87}
88
89#[cfg(test)]
90mod test {
91    #[test]
92    fn reuse() {
93        let mut rt = tokio::runtime::Runtime::new().unwrap();
94        rt.block_on(async move {
95            let mut t = super::Ticker::new(std::time::Duration::from_millis(10));
96            let now = std::time::Instant::now();
97            for _ in 0usize..5 {
98                (&mut t).await;
99            }
100            println!("elapsed: {:?}", now.elapsed());
101        });
102    }
103}