crb_superagent/time/
timer.rs

1use anyhow::{Result, anyhow};
2use crb_core::mpsc;
3use crb_core::time::{Duration, Instant, Sleep, sleep_until};
4use futures::{Future, Stream};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
9pub struct Timeout {
10    pub scheduled_at: Instant,
11}
12
13#[derive(Debug)]
14pub enum ScheduleCommand {
15    Schedule { delay: Duration },
16    Cancel,
17}
18
19struct PendingEvent {
20    baseline: Instant,
21    delay: Duration,
22}
23
24pub struct TimerStream {
25    command_rx: mpsc::UnboundedReceiver<ScheduleCommand>,
26    pending: Option<PendingEvent>,
27    sleep: Option<Pin<Box<Sleep>>>,
28}
29
30pub struct Timer {
31    command_tx: mpsc::UnboundedSender<ScheduleCommand>,
32    stream: Option<TimerStream>,
33}
34
35impl Default for Timer {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl Timer {
42    pub fn new() -> Self {
43        let (tx, rx) = mpsc::unbounded_channel();
44        let stream = TimerStream {
45            command_rx: rx,
46            pending: None,
47            sleep: None,
48        };
49        Self {
50            command_tx: tx,
51            stream: Some(stream),
52        }
53    }
54
55    pub fn schedule(&self, delay: Duration) -> Result<()> {
56        self.command_tx
57            .send(ScheduleCommand::Schedule { delay })
58            .map_err(|_| anyhow!("Can't schedule the task."))
59    }
60
61    pub fn cancel(&self) -> Result<()> {
62        self.command_tx
63            .send(ScheduleCommand::Cancel)
64            .map_err(|_| anyhow!("Can't cancel the task."))
65    }
66
67    pub fn events(&mut self) -> Result<TimerStream> {
68        self.stream
69            .take()
70            .ok_or_else(|| anyhow!("Timer events stream has detached already."))
71    }
72}
73
74impl TimerStream {
75    fn timeout(&mut self, scheduled_at: Instant) -> Poll<Option<Timeout>> {
76        let event = Timeout { scheduled_at };
77        self.pending = None;
78        self.sleep = None;
79        Poll::Ready(Some(event))
80    }
81}
82
83impl Stream for TimerStream {
84    type Item = Timeout;
85
86    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        loop {
88            while let Poll::Ready(cmd) = Pin::new(&mut self.command_rx).poll_recv(cx) {
89                match cmd {
90                    Some(ScheduleCommand::Schedule { delay }) => {
91                        let now = Instant::now();
92                        if let Some(pending) = &mut self.pending {
93                            pending.delay = delay;
94                        } else {
95                            self.pending = Some(PendingEvent {
96                                baseline: now,
97                                delay,
98                            });
99                        }
100
101                        if let Some(pending) = &self.pending {
102                            let scheduled_at = pending.baseline + pending.delay;
103                            if scheduled_at <= now {
104                                return self.timeout(scheduled_at);
105                            } else if let Some(sleep) = &mut self.sleep {
106                                sleep.as_mut().reset(scheduled_at.into());
107                            } else {
108                                self.sleep = Some(Box::pin(sleep_until(scheduled_at.into())));
109                            }
110                        }
111                    }
112                    Some(ScheduleCommand::Cancel) => {
113                        self.pending = None;
114                        self.sleep = None;
115                    }
116                    None => {
117                        // The handle was closed
118                        return Poll::Ready(None);
119                    }
120                }
121            }
122
123            if let Some(pending) = &self.pending {
124                let scheduled_at = pending.baseline + pending.delay;
125                let now = Instant::now();
126                if now >= scheduled_at {
127                    return self.timeout(scheduled_at);
128                }
129            }
130
131            if let Some(sleep) = &mut self.sleep {
132                match sleep.as_mut().poll(cx) {
133                    Poll::Ready(_) => continue,
134                    Poll::Pending => break,
135                }
136            } else {
137                break;
138            }
139        }
140        Poll::Pending
141    }
142}