crb_superagent/time/
interval.rs1use anyhow::{Result, anyhow};
2use crb_core::mpsc;
3use crb_core::time::{Duration, Instant, Sleep, sleep_until};
4use futures::Future;
5use futures::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pub struct Interval {
10 command_tx: mpsc::UnboundedSender<IntervalCommand>,
11 stream: Option<IntervalStream>,
12}
13
14impl Default for Interval {
15 fn default() -> Self {
16 Self::new()
17 }
18}
19
20impl Interval {
21 pub fn new() -> Self {
22 let initial_interval = Duration::from_secs(1);
23 let now = Instant::now();
24 let next_deadline = now + initial_interval;
25 let sleep = Box::pin(sleep_until(next_deadline.into()));
26 let (command_tx, command_rx) = mpsc::unbounded_channel();
27 let stream = IntervalStream {
28 current_interval: initial_interval,
29 last_tick: now,
30 next_deadline,
31 sleep,
32 command_rx,
33 };
34 Self {
35 command_tx,
36 stream: Some(stream),
37 }
38 }
39
40 pub fn set_interval_ms(&self, ms: u64) -> Result<()> {
41 let interval = Duration::from_millis(ms);
42 self.command_tx
43 .send(IntervalCommand::SetInterval(interval))
44 .map_err(|_| anyhow!("Can't set the interval."))
45 }
46
47 pub fn events(&mut self) -> Result<IntervalStream> {
48 self.stream
49 .take()
50 .ok_or_else(|| anyhow!("Interval events stream has detached already."))
51 }
52}
53
54enum IntervalCommand {
55 SetInterval(Duration),
56}
57
58pub struct IntervalStream {
59 current_interval: Duration,
60 last_tick: Instant,
61 next_deadline: Instant,
62 sleep: Pin<Box<Sleep>>,
63 command_rx: mpsc::UnboundedReceiver<IntervalCommand>,
64}
65
66impl IntervalStream {
67 fn update_deadline(&mut self) {
68 let new_deadline = self.last_tick + self.current_interval;
69 self.next_deadline = new_deadline;
70 self.sleep.as_mut().reset(new_deadline.into());
71 }
72}
73
74pub struct Tick(pub Instant);
75
76impl Stream for IntervalStream {
77 type Item = Tick;
78
79 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 loop {
81 while let Poll::Ready(cmd) = Pin::new(&mut self.command_rx).poll_recv(cx) {
82 if let Some(IntervalCommand::SetInterval(new_interval)) = cmd {
83 self.current_interval = new_interval;
84 self.update_deadline();
85 } else {
86 return Poll::Ready(None);
88 }
89 }
90
91 let now = Instant::now();
92 if now >= self.next_deadline {
93 self.last_tick = now;
94 self.update_deadline();
95 return Poll::Ready(Some(Tick(now)));
96 }
97
98 match self.sleep.as_mut().poll(cx) {
99 Poll::Ready(_) => continue,
100 Poll::Pending => break,
101 }
102 }
103 Poll::Pending
104 }
105}