crb_superagent/time/
interval.rs

1use anyhow::{Result, anyhow};
2use crb_core::mpsc;
3use crb_core::time::{Duration, Instant, Sleep, sleep_until};
4use futures::Future;
5use futures::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pub struct Interval {
10    command_tx: mpsc::UnboundedSender<IntervalCommand>,
11    stream: Option<IntervalStream>,
12}
13
14impl Default for Interval {
15    fn default() -> Self {
16        Self::new()
17    }
18}
19
20impl Interval {
21    pub fn new() -> Self {
22        let initial_interval = Duration::from_secs(1);
23        let now = Instant::now();
24        let next_deadline = now + initial_interval;
25        let sleep = Box::pin(sleep_until(next_deadline.into()));
26        let (command_tx, command_rx) = mpsc::unbounded_channel();
27        let stream = IntervalStream {
28            current_interval: initial_interval,
29            last_tick: now,
30            next_deadline,
31            sleep,
32            command_rx,
33        };
34        Self {
35            command_tx,
36            stream: Some(stream),
37        }
38    }
39
40    pub fn set_interval_ms(&self, ms: u64) -> Result<()> {
41        let interval = Duration::from_millis(ms);
42        self.command_tx
43            .send(IntervalCommand::SetInterval(interval))
44            .map_err(|_| anyhow!("Can't set the interval."))
45    }
46
47    pub fn events(&mut self) -> Result<IntervalStream> {
48        self.stream
49            .take()
50            .ok_or_else(|| anyhow!("Interval events stream has detached already."))
51    }
52}
53
54enum IntervalCommand {
55    SetInterval(Duration),
56}
57
58pub struct IntervalStream {
59    current_interval: Duration,
60    last_tick: Instant,
61    next_deadline: Instant,
62    sleep: Pin<Box<Sleep>>,
63    command_rx: mpsc::UnboundedReceiver<IntervalCommand>,
64}
65
66impl IntervalStream {
67    fn update_deadline(&mut self) {
68        let new_deadline = self.last_tick + self.current_interval;
69        self.next_deadline = new_deadline;
70        self.sleep.as_mut().reset(new_deadline.into());
71    }
72}
73
74pub struct Tick(pub Instant);
75
76impl Stream for IntervalStream {
77    type Item = Tick;
78
79    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        loop {
81            while let Poll::Ready(cmd) = Pin::new(&mut self.command_rx).poll_recv(cx) {
82                if let Some(IntervalCommand::SetInterval(new_interval)) = cmd {
83                    self.current_interval = new_interval;
84                    self.update_deadline();
85                } else {
86                    // The handle was closed
87                    return Poll::Ready(None);
88                }
89            }
90
91            let now = Instant::now();
92            if now >= self.next_deadline {
93                self.last_tick = now;
94                self.update_deadline();
95                return Poll::Ready(Some(Tick(now)));
96            }
97
98            match self.sleep.as_mut().poll(cx) {
99                Poll::Ready(_) => continue,
100                Poll::Pending => break,
101            }
102        }
103        Poll::Pending
104    }
105}