timer_kit/
interval.rs

1use std::{pin::Pin, time::{Duration}, task::{Context, Poll}, task::ready, future::poll_fn};
2
3use futures_util::{Stream, Future};
4
5use crate::{Delay, Sleep};
6
7const BUFFER_TIMEOUT: Duration = Duration::from_millis(5);
8
9/// Creates new [`Interval`] that yields with interval of period. The first tick completes
10/// immediately. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this
11/// can be configured by calling [`Interval::set_missed_tick_behavior`].
12/// 
13/// # Panics
14/// 
15/// This function will panic if `period` is zero.
16/// 
17/// # Example
18/// 
19/// Creates an interval with smol's timer
20/// 
21/// ```rust,no_run
22/// use std::time::Duration;
23/// use timer_kit::interval;
24/// 
25/// let mut interval = interval::<smol::Timer>(Duration::from_millis(100));
26/// 
27/// interval.tick().await;
28/// interval.tick().await;
29/// interval.tick().await;
30/// ```
31/// 
32/// Creates an interval with `fluvio_wasm_timer::Delay`
33/// 
34/// ```rust,no_run
35/// use std::time::Duration;
36/// use timer_kit::interval;
37/// 
38/// let mut interval = interval::<fluvio_wasm_timer::Delay>(Duration::from_millis(100));
39/// 
40/// interval.tick().await;
41/// interval.tick().await;
42/// interval.tick().await;
43/// ```
44pub fn interval<D>(duration: Duration) -> Interval<D> 
45where
46    D: Delay,
47    D::Instant: Unpin,
48{
49    Interval::new(duration)
50}
51
52/// Creates new [`Interval`] that yields with interval of period with the first tick completing
53/// at start. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this can
54/// be configured by calling [`Interval::set_missed_tick_behavior`].
55/// 
56/// # Panics
57/// 
58/// This function will panic if `period` is zero.
59/// 
60/// # Example
61/// 
62/// Creates an interval with smol's timer
63/// 
64/// ```rust,no_run
65/// use std::time::{Duration, Instant};
66/// use timer_kit::interval_at;
67/// 
68/// let mut interval = interval_at::<smol::Timer>(Instant::now(), Duration::from_millis(100));
69/// 
70/// interval.tick().await;
71/// interval.tick().await;
72/// interval.tick().await;
73/// ```
74/// 
75/// Creates an interval with `fluvio_wasm_timer::Delay`
76/// 
77/// ```rust,no_run
78/// use std::time::{Duration};
79/// use fluvio_wasm_timer::Instant;
80/// use timer_kit::interval_at;
81/// 
82/// let mut interval = interval_at::<fluvio_wasm_timer::Delay>(Instant::now(), Duration::from_millis(100));
83/// 
84/// interval.tick().await;
85/// interval.tick().await;
86/// interval.tick().await;
87/// ```
88pub fn interval_at<D>(start: D::Instant, duration: Duration) -> Interval<D> 
89where
90    D: Delay,
91    D::Instant: Unpin,
92{
93    Interval::new_at(start, duration)
94}
95
96/// Ported from [`tokio::time::interval::MissedTickBehavior`]
97/// 
98/// # Default
99/// 
100/// The default behavior is [`MissedTickBehavior::Burst`].
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum MissedTickBehavior {
103    /// Ticks as fast as possible until caught up.
104    Burst,
105
106    /// Tick at multiples of `period` from when [`tick`] was called, rather than
107    /// from `start`.
108    Delay,
109
110    /// Skips missed ticks and tick on the next multiple of `period` from
111    /// `start`.
112    Skip,
113}
114
115impl MissedTickBehavior {
116    fn next_timeout<D>(&self, timeout: D::Instant, now: D::Instant, period: Duration) -> D::Instant 
117    where
118        D: Delay,
119    {
120        match self {
121            Self::Burst => timeout + period,
122            Self::Delay => now + period,
123            Self::Skip => {
124                now + period
125                    - Duration::from_nanos(
126                        ((now - timeout).as_nanos() % period.as_nanos())
127                            .try_into()
128                            // This operation is practically guaranteed not to
129                            // fail, as in order for it to fail, `period` would
130                            // have to be longer than `now - timeout`, and both
131                            // would have to be longer than 584 years.
132                            //
133                            // If it did fail, there's not a good way to pass
134                            // the error along to the user, so we just panic.
135                            .expect(
136                                "too much time has elapsed since the interval was supposed to tick",
137                            ),
138                    )
139            }
140        }
141    }
142}
143
144impl Default for MissedTickBehavior {
145    fn default() -> Self {
146        Self::Burst
147    }
148}
149
150/// An [`Interval`] allows you to wait on a sequence of instants with a certain duration between
151/// each instant. 
152/// 
153/// # Type Parameter
154/// 
155/// - `D`: The underlying timer type that implements the [`Delay`] trait
156pub struct Interval<D: Delay> {
157    delay: Pin<Box<Sleep<D>>>,
158    missed_tick_behavior: MissedTickBehavior,
159    period: Duration,
160}
161
162impl<D> std::fmt::Debug for Interval<D>
163where
164    D: Delay + std::fmt::Debug,
165    D::Instant: std::fmt::Debug,
166{
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.debug_struct("Interval")
169            .field("delay", &self.delay)
170            .field("missed_tick_behavior", &self.missed_tick_behavior)
171            .field("period", &self.period)
172            .finish()
173    }
174}
175
176impl<D> Interval<D>
177where
178    D: Delay,
179    D::Instant: Unpin,
180{
181    /// Creates new [`Interval`] that yields with interval of period. The first tick completes
182    /// immediately. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this
183    /// can be configured by calling [`Interval::set_missed_tick_behavior`].
184    /// 
185    /// # Panics
186    /// 
187    /// This function will panic if `period` is zero.
188    /// 
189    /// # Examples
190    /// 
191    /// Creates a new [`Interval`] that yields every 100 milliseconds with smol's timer.
192    /// 
193    /// ```rust,no_run
194    /// use std::time::Duration;
195    /// use timer_kit::Interval;
196    /// 
197    /// let mut interval = Interval::<smol::Timer>::new(Duration::from_millis(100));
198    /// 
199    /// interval.tick().await;
200    /// interval.tick().await;
201    /// interval.tick().await;
202    /// ```
203    /// 
204    /// Creates a new [`Interval`] that yields every 100 milliseconds with
205    /// `fluvio_wasm_timer::Delay`.
206    /// 
207    /// ```rust,no_run
208    /// use std::time::Duration;
209    /// use timer_kit::Interval;
210    /// 
211    /// let mut interval = Interval::<fluvio_wasm_timer::Delay>::new(Duration::from_millis(100));
212    /// 
213    /// interval.tick().await;
214    /// interval.tick().await;
215    /// interval.tick().await;
216    /// ```
217    pub fn new(period: Duration) -> Self {
218        assert!(period > Duration::new(0, 0), "period must be non-zero");
219        Self {
220            delay: Box::pin(Sleep::new(period)),
221            missed_tick_behavior: MissedTickBehavior::default(),
222            period,
223        }
224    }
225
226    /// Creates new [`Interval`] that yields with interval of period with the first tick completing
227    /// at start. The default [`MissedTickBehavior`] is [`MissedTickBehavior::Burst`], but this can
228    /// be configured by calling [`Interval::set_missed_tick_behavior`].
229    /// 
230    /// # Panics
231    /// 
232    /// This function will panic if `period` is zero.
233    /// 
234    /// # Examples
235    /// 
236    /// Creates a new [`Interval`] that yields every 100 milliseconds with smol's timer.
237    /// 
238    /// ```rust,no_run
239    /// use std::time::{Duration, Instant};
240    /// use timer_kit::Interval;
241    /// 
242    /// let mut interval = Interval::<smol::Timer>::new_at(Instant::now(), Duration::from_millis(100));
243    /// 
244    /// interval.tick().await;
245    /// interval.tick().await;
246    /// interval.tick().await;
247    /// ```
248    /// 
249    /// Creates a new [`Interval`] that yields every 100 milliseconds with
250    /// `fluvio_wasm_timer::Delay`.
251    /// 
252    /// ```rust,no_run
253    /// use std::time::Duration;
254    /// use fluvio_wasm_timer::Instant;
255    /// use timer_kit::Interval;
256    /// 
257    /// let mut interval = Interval::<fluvio_wasm_timer::Delay>::new_at(Instant::now(), Duration::from_millis(100));
258    /// 
259    /// interval.tick().await;
260    /// interval.tick().await;
261    /// interval.tick().await;
262    /// ```
263    pub fn new_at(start: D::Instant, period: Duration) -> Self {
264        assert!(period > Duration::new(0, 0), "period must be non-zero");
265        Self {
266            delay: Box::pin(Sleep::new_until(start)),
267            missed_tick_behavior: MissedTickBehavior::default(),
268            period,
269        }
270    }
271
272    /// Returns the [`MissedTickBehavior`] of the [`Interval`].
273    pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
274        self.missed_tick_behavior
275    }
276
277    /// Sets the [`MissedTickBehavior`] of the [`Interval`].
278    pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
279        self.missed_tick_behavior = behavior;
280    }
281
282    /// Returns the period of the [`Interval`].
283    pub fn period(&self) -> Duration {
284        self.period
285    }
286
287    /// Polls the next tick of the [`Interval`].
288    pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<D::Value> {
289        use crate::Instant;
290
291        let value = ready!(self.delay.as_mut().poll(cx));
292
293        let timeout = self.delay.deadline();
294
295        let now = D::Instant::now();
296
297        let next = if now > timeout + BUFFER_TIMEOUT {
298            self.missed_tick_behavior.next_timeout::<D>(timeout, now, self.period)
299        } else {
300            timeout + self.period
301        };
302
303        self.delay.as_mut().reset(next);
304        Poll::Ready(value)
305    }
306
307    /// Completes the next tick of the [`Interval`].
308    pub async fn tick(&mut self) -> D::Value {
309        poll_fn(|cx| self.poll_tick(cx)).await
310    }
311
312    /// Resets the interval to complete one period after the current time.
313    /// This method ignores [`MissedTickBehavior`] strategy.
314    pub fn reset(&mut self) {
315        use crate::Instant;
316
317        let deadline = D::Instant::now() + self.period;
318        self.delay.as_mut().reset(deadline);
319    }
320}
321
322impl<D> Stream for Interval<D>
323where
324    D: Delay,
325    D::Instant: Unpin,
326{
327    type Item = D::Value;
328
329    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330        let value = ready!(self.get_mut().poll_tick(cx));
331        Poll::Ready(Some(value))
332    }
333}