1use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7
8use futures_core::Stream;
9
10use super::Clock;
11use super::timers::TimerKey;
12use crate::timers::TIMER_RESOLUTION;
13
14#[derive(Debug)]
81pub struct PeriodicTimer {
82 period: Duration,
83 clock: Clock,
84 current_timer: Option<TimerKey>,
87}
88
89impl PeriodicTimer {
90 #[must_use]
95 pub fn new(clock: &Clock, period: Duration) -> Self {
96 let period = period.max(TIMER_RESOLUTION);
97
98 Self {
99 current_timer: None,
102 period,
103 clock: clock.clone(),
104 }
105 }
106
107 fn register_timer(&mut self, waker: Waker) {
108 match self.clock.instant().checked_add(self.period) {
109 Some(when) => {
110 self.current_timer = Some(self.clock.register_timer(when, waker));
111 }
112 None => {
113 self.period = Duration::MAX;
117 }
118 }
119 }
120}
121
122impl Stream for PeriodicTimer {
123 type Item = ();
124
125 #[cfg_attr(test, mutants::skip)] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 let this = self.get_mut();
128
129 if this.period == Duration::MAX {
130 return Poll::Pending;
131 }
132
133 match this.current_timer {
134 Some(key) if key.tick() <= this.clock.instant() => {
135 this.current_timer = None;
137
138 this.clock.unregister_timer(key);
141
142 Poll::Ready(Some(()))
143 }
144 Some(_) => Poll::Pending,
146
147 None => {
152 this.register_timer(cx.waker().clone());
153 Poll::Pending
154 }
155 }
156 }
157}
158
159impl Drop for PeriodicTimer {
160 fn drop(&mut self) {
161 if let Some(key) = self.current_timer {
162 self.clock.unregister_timer(key);
163 }
164 }
165}
166
167#[cfg_attr(coverage_nightly, coverage(off))]
168#[cfg(test)]
169mod tests {
170 use std::thread;
171
172 use super::*;
173 use crate::ClockControl;
174
175 #[test]
176 fn assert_types() {
177 static_assertions::assert_impl_all!(PeriodicTimer: Send, Sync);
178 }
179
180 #[cfg(not(miri))]
181 #[tokio::test]
182 async fn next_ensure_awaited() {
183 use futures::StreamExt;
184
185 use crate::FutureExt;
186
187 let clock = Clock::new_tokio();
188 let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
189
190 async move {
191 assert_eq!(timer.next().await, Some(()));
192 assert_eq!(timer.next().await, Some(()));
193 }
194 .timeout(&clock, Duration::from_secs(5))
195 .await
196 .unwrap();
197 }
198
199 #[test]
200 fn next_with_control() {
201 let control = ClockControl::new();
202 let clock = control.to_clock();
203 let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
204
205 assert_eq!(poll_timer(&mut timer), Poll::Pending);
206 thread::sleep(Duration::from_millis(1));
207 assert_eq!(poll_timer(&mut timer), Poll::Pending);
208
209 let len = control.timers_len();
210 control.advance(Duration::from_millis(2));
211 assert_eq!(control.timers_len(), len - 1);
212 assert_eq!(poll_timer(&mut timer), Poll::Ready(Some(())));
213 }
214
215 #[test]
216 fn first_poll_next_should_be_pending() {
217 let clock = Clock::new_frozen();
218
219 let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
220
221 assert_eq!(poll_timer(&mut timer), Poll::Pending);
222 }
223
224 #[test]
225 fn new_zero_duration_period_adjusted() {
226 let clock = Clock::new_frozen();
227
228 let timer = PeriodicTimer::new(&clock, Duration::ZERO);
229
230 assert_eq!(timer.period, Duration::from_millis(1));
231 }
232
233 #[test]
234 fn new_duration_near_max_never_fires() {
235 let clock = Clock::new_frozen();
236
237 let mut timer = PeriodicTimer::new(&clock, Duration::MAX.saturating_sub(Duration::from_millis(1)));
238
239 assert_eq!(poll_timer(&mut timer), Poll::Pending);
240 assert_eq!(poll_timer(&mut timer), Poll::Pending);
241
242 assert_eq!(timer.period, Duration::MAX);
243 assert_eq!(timer.current_timer, None);
244 }
245
246 #[cfg(not(miri))]
247 #[tokio::test]
248 async fn ready_without_advancing_timers_ensure_timer_unregistered() {
249 let clock = Clock::new_tokio();
250 let period = Duration::from_millis(1);
251 let mut timer = PeriodicTimer::new(&clock, period);
252
253 assert_eq!(poll_timer(&mut timer), Poll::Pending);
254 thread::sleep(period);
255 assert_eq!(poll_timer(&mut timer), Poll::Ready(Some(())));
256
257 assert_eq!(timer.current_timer, None);
258 assert_eq!(clock.clock_state().timers_len(), 0);
259 }
260
261 #[test]
262 fn drop_periodic_timer_unregisters_timer() {
263 let clock = Clock::new_frozen();
264 let period = Duration::from_millis(1);
265
266 {
268 let mut timer = PeriodicTimer::new(&clock, period);
269 assert_eq!(poll_timer(&mut timer), Poll::Pending);
270 assert_eq!(clock.clock_state().timers_len(), 1);
271 }
273
274 assert_eq!(clock.clock_state().timers_len(), 0);
276 }
277
278 fn poll_timer(delay: &mut PeriodicTimer) -> Poll<Option<()>> {
279 let waker = Waker::noop().clone();
280 let mut cx = Context::from_waker(&waker);
281 let delay = std::pin::pin!(delay);
282
283 delay.poll_next(&mut cx)
284 }
285}