async_tick/
interval.rs

1use super::{now, WAITS, WAITS_NUM};
2use super::waiter::{Token, Waiter};
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use core::time::Duration;
6use futures_util::Stream;
7
8pub fn interval(period: Duration) -> Interval {
9    Interval {
10        pool: &WAITS,
11        last: now(),
12        period,
13        token: None,
14    }
15}
16
17pub struct Interval {
18    pool: &'static Waiter< WAITS_NUM>,
19    last: u64,
20    period: Duration,
21    token: Option<Token<'static, WAITS_NUM>>,
22}
23impl Stream for Interval {
24    type Item = ();
25    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26        let next_time = self.last + self.period.as_nanos() as u64;
27        if next_time < now() {
28            self.last = next_time;
29            return Poll::Ready(Some(()));
30        }
31        let waker = cx.waker();
32        if let Some(token) = &self.token {
33            token.swap(waker.clone(), next_time);
34        } else if let Ok(token) = self.pool.register() {
35            token.swap(waker.clone(), next_time);
36            self.token = Some(token);
37        } else {
38            waker.wake_by_ref();
39        }
40        if next_time < now() {
41            self.last = next_time;
42            Poll::Ready(Some(()))
43        } else {
44            Poll::Pending
45        }
46    }
47}