crb_superagent/time/
timer.rs1use anyhow::{Result, anyhow};
2use crb_core::mpsc;
3use crb_core::time::{Duration, Instant, Sleep, sleep_until};
4use futures::{Future, Stream};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
9pub struct Timeout {
10 pub scheduled_at: Instant,
11}
12
13#[derive(Debug)]
14pub enum ScheduleCommand {
15 Schedule { delay: Duration },
16 Cancel,
17}
18
19struct PendingEvent {
20 baseline: Instant,
21 delay: Duration,
22}
23
24pub struct TimerStream {
25 command_rx: mpsc::UnboundedReceiver<ScheduleCommand>,
26 pending: Option<PendingEvent>,
27 sleep: Option<Pin<Box<Sleep>>>,
28}
29
30pub struct Timer {
31 command_tx: mpsc::UnboundedSender<ScheduleCommand>,
32 stream: Option<TimerStream>,
33}
34
35impl Default for Timer {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl Timer {
42 pub fn new() -> Self {
43 let (tx, rx) = mpsc::unbounded_channel();
44 let stream = TimerStream {
45 command_rx: rx,
46 pending: None,
47 sleep: None,
48 };
49 Self {
50 command_tx: tx,
51 stream: Some(stream),
52 }
53 }
54
55 pub fn schedule(&self, delay: Duration) -> Result<()> {
56 self.command_tx
57 .send(ScheduleCommand::Schedule { delay })
58 .map_err(|_| anyhow!("Can't schedule the task."))
59 }
60
61 pub fn cancel(&self) -> Result<()> {
62 self.command_tx
63 .send(ScheduleCommand::Cancel)
64 .map_err(|_| anyhow!("Can't cancel the task."))
65 }
66
67 pub fn events(&mut self) -> Result<TimerStream> {
68 self.stream
69 .take()
70 .ok_or_else(|| anyhow!("Timer events stream has detached already."))
71 }
72}
73
74impl TimerStream {
75 fn timeout(&mut self, scheduled_at: Instant) -> Poll<Option<Timeout>> {
76 let event = Timeout { scheduled_at };
77 self.pending = None;
78 self.sleep = None;
79 Poll::Ready(Some(event))
80 }
81}
82
83impl Stream for TimerStream {
84 type Item = Timeout;
85
86 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 loop {
88 while let Poll::Ready(cmd) = Pin::new(&mut self.command_rx).poll_recv(cx) {
89 match cmd {
90 Some(ScheduleCommand::Schedule { delay }) => {
91 let now = Instant::now();
92 if let Some(pending) = &mut self.pending {
93 pending.delay = delay;
94 } else {
95 self.pending = Some(PendingEvent {
96 baseline: now,
97 delay,
98 });
99 }
100
101 if let Some(pending) = &self.pending {
102 let scheduled_at = pending.baseline + pending.delay;
103 if scheduled_at <= now {
104 return self.timeout(scheduled_at);
105 } else if let Some(sleep) = &mut self.sleep {
106 sleep.as_mut().reset(scheduled_at.into());
107 } else {
108 self.sleep = Some(Box::pin(sleep_until(scheduled_at.into())));
109 }
110 }
111 }
112 Some(ScheduleCommand::Cancel) => {
113 self.pending = None;
114 self.sleep = None;
115 }
116 None => {
117 return Poll::Ready(None);
119 }
120 }
121 }
122
123 if let Some(pending) = &self.pending {
124 let scheduled_at = pending.baseline + pending.delay;
125 let now = Instant::now();
126 if now >= scheduled_at {
127 return self.timeout(scheduled_at);
128 }
129 }
130
131 if let Some(sleep) = &mut self.sleep {
132 match sleep.as_mut().poll(cx) {
133 Poll::Ready(_) => continue,
134 Poll::Pending => break,
135 }
136 } else {
137 break;
138 }
139 }
140 Poll::Pending
141 }
142}