1use super::{now, WAITS, WAITS_NUM};
2use super::waiter::{Token, Waiter};
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use core::time::Duration;
6use futures_util::Stream;
7
8pub fn interval(period: Duration) -> Interval {
9 Interval {
10 pool: &WAITS,
11 last: now(),
12 period,
13 token: None,
14 }
15}
16
17pub struct Interval {
18 pool: &'static Waiter< WAITS_NUM>,
19 last: u64,
20 period: Duration,
21 token: Option<Token<'static, WAITS_NUM>>,
22}
23impl Stream for Interval {
24 type Item = ();
25 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26 let next_time = self.last + self.period.as_nanos() as u64;
27 if next_time < now() {
28 self.last = next_time;
29 return Poll::Ready(Some(()));
30 }
31 let waker = cx.waker();
32 if let Some(token) = &self.token {
33 token.swap(waker.clone(), next_time);
34 } else if let Ok(token) = self.pool.register() {
35 token.swap(waker.clone(), next_time);
36 self.token = Some(token);
37 } else {
38 waker.wake_by_ref();
39 }
40 if next_time < now() {
41 self.last = next_time;
42 Poll::Ready(Some(()))
43 } else {
44 Poll::Pending
45 }
46 }
47}