Skip to main content

tick/
periodic_timer.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7
8use futures_core::Stream;
9
10use super::Clock;
11use super::timers::TimerKey;
12use crate::timers::TIMER_RESOLUTION;
13
14/// A timer that periodically ticks.
15///
16/// A periodic timer can be created using the [`PeriodicTimer::new()`] constructor,
17/// which requires a reference to a [`Clock`].
18///
19/// # Precision
20///
21/// The timer uses the current thread's scheduler to schedule its ticks. The precision
22/// of the timer is affected by the load on this thread. There are no guarantees about the
23/// precision of the timer other than that it will eventually tick. When the thread is healthy,
24/// the timer's period should be close to the specified one.
25///
26/// > **Note**: The periodic timer is not affected by adjustments to the system clock.
27///
28/// # Stream Behavior
29///
30/// `PeriodicTimer` implements [`Stream`] and will never complete. The stream produces
31/// a tick every period indefinitely. Use stream combinators like [`StreamExt::take`]
32/// to limit the number of ticks.
33///
34/// [`StreamExt::take`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take
35///
36/// # Examples
37///
38/// ## Create a periodic timer
39///
40/// ```
41/// use std::time::Duration;
42///
43/// use futures::StreamExt;
44/// use tick::{Clock, PeriodicTimer, Stopwatch};
45///
46/// # async fn periodic_timer_example(clock: &Clock) {
47/// let timer = PeriodicTimer::new(clock, Duration::from_millis(1));
48///
49/// timer
50///     .take(3)
51///     .for_each(async |()| {
52///         // Do something every 1ms
53///     })
54///     .await;
55/// # }
56/// ```
57///
58/// ## Create a periodic timer with initial delay
59///
60/// ```
61/// use std::time::Duration;
62///
63/// use futures::StreamExt;
64/// use tick::{Clock, PeriodicTimer};
65///
66/// # async fn periodic_timer_example(clock: &Clock) {
67/// // Delay for 10ms before the timer starts ticking
68/// clock.delay(Duration::from_millis(10)).await;
69///
70/// let timer = PeriodicTimer::new(clock, Duration::from_millis(1));
71///
72/// timer
73///     .take(3)
74///     .for_each(async |()| {
75///         // Do something every 1ms
76///     })
77///     .await;
78/// # }
79/// ```
80#[derive(Debug)]
81pub struct PeriodicTimer {
82    period: Duration,
83    clock: Clock,
84    // Currently scheduled timer. This value is not initialized until
85    // the first call to the `Stream::poll_next` method.
86    current_timer: Option<TimerKey>,
87}
88
89impl PeriodicTimer {
90    /// Creates a timer that fires periodically.
91    ///
92    /// > **Note**: The minimum precision of the timer is 1ms. If a smaller period is specified,
93    /// > it will be adjusted to 1ms.
94    #[must_use]
95    pub fn new(clock: &Clock, period: Duration) -> Self {
96        let period = period.max(TIMER_RESOLUTION);
97
98        Self {
99            // The timer is not registered yet; it will be registered on the first
100            // call to `Stream::poll_next`.
101            current_timer: None,
102            period,
103            clock: clock.clone(),
104        }
105    }
106
107    fn register_timer(&mut self, waker: Waker) {
108        match self.clock.instant().checked_add(self.period) {
109            Some(when) => {
110                self.current_timer = Some(self.clock.register_timer(when, waker));
111            }
112            None => {
113                // The timer would tick so far in the future that we can assume
114                // it never fires. For this reason, there is no point in registering it.
115                // The period is set to Duration::MAX to prevent further registrations.
116                self.period = Duration::MAX;
117            }
118        }
119    }
120}
121
122impl Stream for PeriodicTimer {
123    type Item = ();
124
125    #[cfg_attr(test, mutants::skip)] // cannot reliably check that poll_tick has been called
126    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        let this = self.get_mut();
128
129        if this.period == Duration::MAX {
130            return Poll::Pending;
131        }
132
133        match this.current_timer {
134            Some(key) if key.tick() <= this.clock.instant() => {
135                // Reset the timer. It will be registered again on the next poll.
136                this.current_timer = None;
137
138                // Unregister the timer, just in case this call was explicit and not due to
139                // timers advancing.
140                this.clock.unregister_timer(key);
141
142                Poll::Ready(Some(()))
143            }
144            // Timer is registered and will fire later in the future.
145            Some(_) => Poll::Pending,
146
147            // Timer is not registered yet; let's register it.
148            // The registration is lazy, occurring when someone polls the future. This means
149            // that the work between two ticks is not taken into account when scheduling
150            // the next tick. When the thread is busy, the timer may tick later than expected.
151            None => {
152                this.register_timer(cx.waker().clone());
153                Poll::Pending
154            }
155        }
156    }
157}
158
159impl Drop for PeriodicTimer {
160    fn drop(&mut self) {
161        if let Some(key) = self.current_timer {
162            self.clock.unregister_timer(key);
163        }
164    }
165}
166
167#[cfg_attr(coverage_nightly, coverage(off))]
168#[cfg(test)]
169mod tests {
170    use std::thread;
171
172    use super::*;
173    use crate::ClockControl;
174
175    #[test]
176    fn assert_types() {
177        static_assertions::assert_impl_all!(PeriodicTimer: Send, Sync);
178    }
179
180    #[cfg(not(miri))]
181    #[tokio::test]
182    async fn next_ensure_awaited() {
183        use futures::StreamExt;
184
185        use crate::FutureExt;
186
187        let clock = Clock::new_tokio();
188        let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
189
190        async move {
191            assert_eq!(timer.next().await, Some(()));
192            assert_eq!(timer.next().await, Some(()));
193        }
194        .timeout(&clock, Duration::from_secs(5))
195        .await
196        .unwrap();
197    }
198
199    #[test]
200    fn next_with_control() {
201        let control = ClockControl::new();
202        let clock = control.to_clock();
203        let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
204
205        assert_eq!(poll_timer(&mut timer), Poll::Pending);
206        thread::sleep(Duration::from_millis(1));
207        assert_eq!(poll_timer(&mut timer), Poll::Pending);
208
209        let len = control.timers_len();
210        control.advance(Duration::from_millis(2));
211        assert_eq!(control.timers_len(), len - 1);
212        assert_eq!(poll_timer(&mut timer), Poll::Ready(Some(())));
213    }
214
215    #[test]
216    fn first_poll_next_should_be_pending() {
217        let clock = Clock::new_frozen();
218
219        let mut timer = PeriodicTimer::new(&clock, Duration::from_millis(1));
220
221        assert_eq!(poll_timer(&mut timer), Poll::Pending);
222    }
223
224    #[test]
225    fn new_zero_duration_period_adjusted() {
226        let clock = Clock::new_frozen();
227
228        let timer = PeriodicTimer::new(&clock, Duration::ZERO);
229
230        assert_eq!(timer.period, Duration::from_millis(1));
231    }
232
233    #[test]
234    fn new_duration_near_max_never_fires() {
235        let clock = Clock::new_frozen();
236
237        let mut timer = PeriodicTimer::new(&clock, Duration::MAX.saturating_sub(Duration::from_millis(1)));
238
239        assert_eq!(poll_timer(&mut timer), Poll::Pending);
240        assert_eq!(poll_timer(&mut timer), Poll::Pending);
241
242        assert_eq!(timer.period, Duration::MAX);
243        assert_eq!(timer.current_timer, None);
244    }
245
246    #[cfg(not(miri))]
247    #[tokio::test]
248    async fn ready_without_advancing_timers_ensure_timer_unregistered() {
249        let clock = Clock::new_tokio();
250        let period = Duration::from_millis(1);
251        let mut timer = PeriodicTimer::new(&clock, period);
252
253        assert_eq!(poll_timer(&mut timer), Poll::Pending);
254        thread::sleep(period);
255        assert_eq!(poll_timer(&mut timer), Poll::Ready(Some(())));
256
257        assert_eq!(timer.current_timer, None);
258        assert_eq!(clock.clock_state().timers_len(), 0);
259    }
260
261    #[test]
262    fn drop_periodic_timer_unregisters_timer() {
263        let clock = Clock::new_frozen();
264        let period = Duration::from_millis(1);
265
266        // Create and poll the periodic timer to register an elementary timer.
267        {
268            let mut timer = PeriodicTimer::new(&clock, period);
269            assert_eq!(poll_timer(&mut timer), Poll::Pending);
270            assert_eq!(clock.clock_state().timers_len(), 1);
271            // Periodic timer is dropped here
272        }
273
274        // Elementary timer should be unregistered after dropping the periodic timer.
275        assert_eq!(clock.clock_state().timers_len(), 0);
276    }
277
278    fn poll_timer(delay: &mut PeriodicTimer) -> Poll<Option<()>> {
279        let waker = Waker::noop().clone();
280        let mut cx = Context::from_waker(&waker);
281        let delay = std::pin::pin!(delay);
282
283        delay.poll_next(&mut cx)
284    }
285}