1use crate::reactor::Reactor;
25use crate::sync::ThreadSafety;
26
27use std::fmt;
28use std::future::Future;
29use std::pin::Pin;
30use std::task::{Context, Poll, Waker};
31use std::time::{Duration, Instant};
32
33use futures_lite::stream::Stream;
34
35pub struct Timer<TS: ThreadSafety = crate::DefaultThreadSafety> {
46 reactor: TS::Rc<Reactor<TS>>,
48
49 id_and_waker: Option<(usize, Waker)>,
51
52 deadline: Option<Instant>,
54
55 period: Duration,
57}
58
59impl<TS: ThreadSafety> fmt::Debug for Timer<TS> {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("Timer")
62 .field("deadline", &self.deadline)
63 .field("period", &self.period)
64 .field("registered", &self.id_and_waker.is_some())
65 .finish()
66 }
67}
68
69impl<TS: ThreadSafety> Unpin for Timer<TS> {}
70
71impl<TS: ThreadSafety> Timer<TS> {
72 pub fn never() -> Self {
74 Self {
75 reactor: Reactor::<TS>::get(),
76 id_and_waker: None,
77 deadline: None,
78 period: Duration::MAX,
79 }
80 }
81
82 pub fn will_fire(&self) -> bool {
84 self.deadline.is_some()
85 }
86
87 pub fn after(duration: Duration) -> Self {
89 Instant::now()
90 .checked_add(duration)
91 .map_or_else(Self::never, Self::at)
92 }
93
94 pub fn at(deadline: Instant) -> Self {
96 Self::interval_at(deadline, Duration::MAX)
97 }
98
99 pub fn interval(period: Duration) -> Self {
101 Instant::now()
102 .checked_add(period)
103 .map_or_else(Self::never, |deadline| Self::interval_at(deadline, period))
104 }
105
106 pub fn interval_at(start: Instant, period: Duration) -> Self {
108 Self {
109 reactor: Reactor::<TS>::get(),
110 id_and_waker: None,
111 deadline: Some(start),
112 period,
113 }
114 }
115
116 pub fn set_never(&mut self) {
118 self.clear();
119 self.deadline = None;
120 }
121
122 pub fn set_after(&mut self, duration: Duration) {
124 match Instant::now().checked_add(duration) {
125 Some(deadline) => self.set_at(deadline),
126 None => self.set_never(),
127 }
128 }
129
130 pub fn set_at(&mut self, deadline: Instant) {
132 self.set_interval_at(deadline, Duration::MAX)
133 }
134
135 pub fn set_interval(&mut self, period: Duration) {
137 match Instant::now().checked_add(period) {
138 Some(deadline) => self.set_interval_at(deadline, period),
139 None => self.set_never(),
140 }
141 }
142
143 pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
145 self.clear();
146
147 self.deadline = Some(start);
148 self.period = period;
149
150 if let Some((id, waker)) = self.id_and_waker.as_mut() {
151 *id = self.reactor.insert_timer(start, waker);
153 }
154 }
155
156 fn clear(&mut self) {
157 if let (Some(deadline), Some((id, _))) = (self.deadline.take(), self.id_and_waker.take()) {
158 self.reactor.remove_timer(deadline, id);
159 }
160 }
161}
162
163impl<TS: ThreadSafety> Drop for Timer<TS> {
164 fn drop(&mut self) {
165 self.clear();
166 }
167}
168
169impl<TS: ThreadSafety> Future for Timer<TS> {
170 type Output = Instant;
171
172 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173 self.poll_next(cx).map(Option::unwrap)
174 }
175}
176
177impl<TS: ThreadSafety> Stream for Timer<TS> {
178 type Item = Instant;
179
180 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 let this = self.get_mut();
182
183 if let Some(ref mut deadline) = this.deadline {
184 if *deadline < Instant::now() {
186 if let Some((id, _)) = this.id_and_waker.take() {
187 this.reactor.remove_timer(*deadline, id);
188 }
189
190 let result_time = *deadline;
191
192 if let Some(next) = deadline.checked_add(this.period) {
193 *deadline = next;
194
195 let id = this.reactor.insert_timer(next, cx.waker());
197 this.id_and_waker = Some((id, cx.waker().clone()));
198 } else {
199 this.deadline = None;
200 }
201
202 return Poll::Ready(Some(result_time));
204 } else {
205 match &this.id_and_waker {
206 None => {
207 let id = this.reactor.insert_timer(*deadline, cx.waker());
209 this.id_and_waker = Some((id, cx.waker().clone()));
210 }
211
212 Some((id, w)) if !w.will_wake(cx.waker()) => {
213 this.reactor.remove_timer(*deadline, *id);
215
216 let id = this.reactor.insert_timer(*deadline, cx.waker());
218 this.id_and_waker = Some((id, cx.waker().clone()));
219 }
220
221 _ => {}
222 }
223 }
224 }
225
226 Poll::Pending
227 }
228}