timer_kit/interval.rs
1use std::{pin::Pin, time::{Duration}, task::{Context, Poll}, task::ready, future::poll_fn};
2
3use futures_util::{Stream, Future};
4
5use crate::{Delay, Sleep};
6
7const BUFFER_TIMEOUT: Duration = Duration::from_millis(5);
8
9/// Creates new [`Interval`] that yields with interval of period. The first tick completes
10/// immediately. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this
11/// can be configured by calling [`Interval::set_missed_tick_behavior`].
12///
13/// # Panics
14///
15/// This function will panic if `period` is zero.
16///
17/// # Example
18///
19/// Creates an interval with smol's timer
20///
21/// ```rust,no_run
22/// use std::time::Duration;
23/// use timer_kit::interval;
24///
25/// let mut interval = interval::<smol::Timer>(Duration::from_millis(100));
26///
27/// interval.tick().await;
28/// interval.tick().await;
29/// interval.tick().await;
30/// ```
31///
32/// Creates an interval with `fluvio_wasm_timer::Delay`
33///
34/// ```rust,no_run
35/// use std::time::Duration;
36/// use timer_kit::interval;
37///
38/// let mut interval = interval::<fluvio_wasm_timer::Delay>(Duration::from_millis(100));
39///
40/// interval.tick().await;
41/// interval.tick().await;
42/// interval.tick().await;
43/// ```
44pub fn interval<D>(duration: Duration) -> Interval<D>
45where
46 D: Delay,
47 D::Instant: Unpin,
48{
49 Interval::new(duration)
50}
51
52/// Creates new [`Interval`] that yields with interval of period with the first tick completing
53/// at start. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this can
54/// be configured by calling [`Interval::set_missed_tick_behavior`].
55///
56/// # Panics
57///
58/// This function will panic if `period` is zero.
59///
60/// # Example
61///
62/// Creates an interval with smol's timer
63///
64/// ```rust,no_run
65/// use std::time::{Duration, Instant};
66/// use timer_kit::interval_at;
67///
68/// let mut interval = interval_at::<smol::Timer>(Instant::now(), Duration::from_millis(100));
69///
70/// interval.tick().await;
71/// interval.tick().await;
72/// interval.tick().await;
73/// ```
74///
75/// Creates an interval with `fluvio_wasm_timer::Delay`
76///
77/// ```rust,no_run
78/// use std::time::{Duration};
79/// use fluvio_wasm_timer::Instant;
80/// use timer_kit::interval_at;
81///
82/// let mut interval = interval_at::<fluvio_wasm_timer::Delay>(Instant::now(), Duration::from_millis(100));
83///
84/// interval.tick().await;
85/// interval.tick().await;
86/// interval.tick().await;
87/// ```
88pub fn interval_at<D>(start: D::Instant, duration: Duration) -> Interval<D>
89where
90 D: Delay,
91 D::Instant: Unpin,
92{
93 Interval::new_at(start, duration)
94}
95
96/// Ported from [`tokio::time::interval::MissedTickBehavior`]
97///
98/// # Default
99///
100/// The default behavior is [`MissedTickBehavior::Burst`].
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum MissedTickBehavior {
103 /// Ticks as fast as possible until caught up.
104 Burst,
105
106 /// Tick at multiples of `period` from when [`tick`] was called, rather than
107 /// from `start`.
108 Delay,
109
110 /// Skips missed ticks and tick on the next multiple of `period` from
111 /// `start`.
112 Skip,
113}
114
115impl MissedTickBehavior {
116 fn next_timeout<D>(&self, timeout: D::Instant, now: D::Instant, period: Duration) -> D::Instant
117 where
118 D: Delay,
119 {
120 match self {
121 Self::Burst => timeout + period,
122 Self::Delay => now + period,
123 Self::Skip => {
124 now + period
125 - Duration::from_nanos(
126 ((now - timeout).as_nanos() % period.as_nanos())
127 .try_into()
128 // This operation is practically guaranteed not to
129 // fail, as in order for it to fail, `period` would
130 // have to be longer than `now - timeout`, and both
131 // would have to be longer than 584 years.
132 //
133 // If it did fail, there's not a good way to pass
134 // the error along to the user, so we just panic.
135 .expect(
136 "too much time has elapsed since the interval was supposed to tick",
137 ),
138 )
139 }
140 }
141 }
142}
143
144impl Default for MissedTickBehavior {
145 fn default() -> Self {
146 Self::Burst
147 }
148}
149
150/// An [`Interval`] allows you to wait on a sequence of instants with a certain duration between
151/// each instant.
152///
153/// # Type Parameter
154///
155/// - `D`: The underlying timer type that implements the [`Delay`] trait
156pub struct Interval<D: Delay> {
157 delay: Pin<Box<Sleep<D>>>,
158 missed_tick_behavior: MissedTickBehavior,
159 period: Duration,
160}
161
162impl<D> std::fmt::Debug for Interval<D>
163where
164 D: Delay + std::fmt::Debug,
165 D::Instant: std::fmt::Debug,
166{
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.debug_struct("Interval")
169 .field("delay", &self.delay)
170 .field("missed_tick_behavior", &self.missed_tick_behavior)
171 .field("period", &self.period)
172 .finish()
173 }
174}
175
176impl<D> Interval<D>
177where
178 D: Delay,
179 D::Instant: Unpin,
180{
181 /// Creates new [`Interval`] that yields with interval of period. The first tick completes
182 /// immediately. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this
183 /// can be configured by calling [`Interval::set_missed_tick_behavior`].
184 ///
185 /// # Panics
186 ///
187 /// This function will panic if `period` is zero.
188 ///
189 /// # Examples
190 ///
191 /// Creates a new [`Interval`] that yields every 100 milliseconds with smol's timer.
192 ///
193 /// ```rust,no_run
194 /// use std::time::Duration;
195 /// use timer_kit::Interval;
196 ///
197 /// let mut interval = Interval::<smol::Timer>::new(Duration::from_millis(100));
198 ///
199 /// interval.tick().await;
200 /// interval.tick().await;
201 /// interval.tick().await;
202 /// ```
203 ///
204 /// Creates a new [`Interval`] that yields every 100 milliseconds with
205 /// `fluvio_wasm_timer::Delay`.
206 ///
207 /// ```rust,no_run
208 /// use std::time::Duration;
209 /// use timer_kit::Interval;
210 ///
211 /// let mut interval = Interval::<fluvio_wasm_timer::Delay>::new(Duration::from_millis(100));
212 ///
213 /// interval.tick().await;
214 /// interval.tick().await;
215 /// interval.tick().await;
216 /// ```
217 pub fn new(period: Duration) -> Self {
218 assert!(period > Duration::new(0, 0), "period must be non-zero");
219 Self {
220 delay: Box::pin(Sleep::new(period)),
221 missed_tick_behavior: MissedTickBehavior::default(),
222 period,
223 }
224 }
225
226 /// Creates new [`Interval`] that yields with interval of period with the first tick completing
227 /// at start. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this can
228 /// be configured by calling [`Interval::set_missed_tick_behavior`].
229 ///
230 /// # Panics
231 ///
232 /// This function will panic if `period` is zero.
233 ///
234 /// # Examples
235 ///
236 /// Creates a new [`Interval`] that yields every 100 milliseconds with smol's timer.
237 ///
238 /// ```rust,no_run
239 /// use std::time::{Duration, Instant};
240 /// use timer_kit::Interval;
241 ///
242 /// let mut interval = Interval::<smol::Timer>::new_at(Instant::now(), Duration::from_millis(100));
243 ///
244 /// interval.tick().await;
245 /// interval.tick().await;
246 /// interval.tick().await;
247 /// ```
248 ///
249 /// Creates a new [`Interval`] that yields every 100 milliseconds with
250 /// `fluvio_wasm_timer::Delay`.
251 ///
252 /// ```rust,no_run
253 /// use std::time::Duration;
254 /// use fluvio_wasm_timer::Instant;
255 /// use timer_kit::Interval;
256 ///
257 /// let mut interval = Interval::<fluvio_wasm_timer::Delay>::new_at(Instant::now(), Duration::from_millis(100));
258 ///
259 /// interval.tick().await;
260 /// interval.tick().await;
261 /// interval.tick().await;
262 /// ```
263 pub fn new_at(start: D::Instant, period: Duration) -> Self {
264 assert!(period > Duration::new(0, 0), "period must be non-zero");
265 Self {
266 delay: Box::pin(Sleep::new_until(start)),
267 missed_tick_behavior: MissedTickBehavior::default(),
268 period,
269 }
270 }
271
272 /// Returns the [`MissedTickBehavior`] of the [`Interval`].
273 pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
274 self.missed_tick_behavior
275 }
276
277 /// Sets the [`MissedTickBehavior`] of the [`Interval`].
278 pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
279 self.missed_tick_behavior = behavior;
280 }
281
282 /// Returns the period of the [`Interval`].
283 pub fn period(&self) -> Duration {
284 self.period
285 }
286
287 /// Polls the next tick of the [`Interval`].
288 pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<D::Value> {
289 use crate::Instant;
290
291 let value = ready!(self.delay.as_mut().poll(cx));
292
293 let timeout = self.delay.deadline();
294
295 let now = D::Instant::now();
296
297 let next = if now > timeout + BUFFER_TIMEOUT {
298 self.missed_tick_behavior.next_timeout::<D>(timeout, now, self.period)
299 } else {
300 timeout + self.period
301 };
302
303 self.delay.as_mut().reset(next);
304 Poll::Ready(value)
305 }
306
307 /// Completes the next tick of the [`Interval`].
308 pub async fn tick(&mut self) -> D::Value {
309 poll_fn(|cx| self.poll_tick(cx)).await
310 }
311
312 /// Resets the interval to complete one period after the current time.
313 /// This method ignores [`MissedTickBehavior`] strategy.
314 pub fn reset(&mut self) {
315 use crate::Instant;
316
317 let deadline = D::Instant::now() + self.period;
318 self.delay.as_mut().reset(deadline);
319 }
320}
321
322impl<D> Stream for Interval<D>
323where
324 D: Delay,
325 D::Instant: Unpin,
326{
327 type Item = D::Value;
328
329 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330 let value = ready!(self.get_mut().poll_tick(cx));
331 Poll::Ready(Some(value))
332 }
333}