Skip to main content

gstthreadshare/runtime/executor/
timer.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// This is based on https://github.com/smol-rs/async-io
3// with adaptations by:
4//
5// Copyright (C) 2021-2022 François Laignel <fengalin@free.fr>
6
7use futures::stream::{FusedStream, Stream};
8
9use std::error::Error;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::task::{Context, Poll, Waker};
14use std::time::{Duration, Instant};
15
16use super::reactor::{AfterTimerId, Reactor, RegularTimerId};
17
18#[derive(Debug)]
19pub struct IntervalError;
20
21impl Error for IntervalError {}
22
23impl fmt::Display for IntervalError {
24    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
25        f.write_str("Interval period can't be null")
26    }
27}
28
29/// Creates a timer that emits an event once after the given delay.
30///
31/// When throttling is activated (i.e. when using a non-`0` `wait`
32/// duration in `Context::acquire`), timer entries are assigned to
33/// the nearest time frame, meaning that the delay might elapse
34/// `wait` / 2 ms earlier or later than the expected instant.
35///
36/// Use [`delay_for_at_least`] when it's preferable not to return
37/// before the expected instant.
38pub fn delay_for(delay: Duration) -> Oneshot {
39    if delay <= Reactor::with(|r| r.half_max_throttling()) {
40        // timer should fire now.
41        return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
42    }
43
44    Oneshot::new(Instant::now() + delay)
45}
46
47/// Creates a timer that emits an event once at the given time instant.
48///
49/// When throttling is activated (i.e. when using a non-`0` `wait`
50/// duration in `Context::acquire`), timer entries are assigned to
51/// the nearest time frame, meaning that the delay might elapse
52/// `wait` / 2 ms earlier or later than the expected instant.
53///
54/// Use [`after`] when it's preferable not to return
55/// before the expected instant.
56pub fn at(when: Instant) -> Oneshot {
57    if when <= Instant::now() {
58        // timer should fire now.
59        return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
60    }
61
62    Oneshot::new(when)
63}
64
65/// Creates a timer that emits events periodically, starting as soon as possible.
66///
67/// Returns an error if `period` is zero.
68///
69/// When throttling is activated (i.e. when using a non-`0` `wait`
70/// duration in `Context::acquire`), timer entries are assigned to
71/// the nearest time frame, meaning that the delay might elapse
72/// `wait` / 2 ms earlier or later than the expected instant.
73///
74/// Use [`interval_at_least`] when it's preferable not to tick
75/// before the expected instants.
76pub fn interval(period: Duration) -> Result<Interval, IntervalError> {
77    interval_at(Instant::now(), period)
78}
79
80/// Creates a timer that emits events periodically, starting after `delay`.
81///
82/// Returns an error if `period` is zero.
83///
84/// When throttling is activated (i.e. when using a non-`0` `wait`
85/// duration in `Context::acquire`), timer entries are assigned to
86/// the nearest time frame, meaning that the delay might elapse
87/// `wait` / 2 ms earlier or later than the expected instant.
88///
89/// Use [`interval_delayed_by_at_least`] when it's preferable not to tick
90/// before the expected instants.
91pub fn interval_delayed_by(delay: Duration, period: Duration) -> Result<Interval, IntervalError> {
92    interval_at(Instant::now() + delay, period)
93}
94
95/// Creates a timer that emits events periodically, starting at `start`.
96///
97/// When throttling is activated (i.e. when using a non-`0` `wait`
98/// duration in `Context::acquire`), timer entries are assigned to
99/// the nearest time frame, meaning that the delay might elapse
100/// `wait` / 2 ms earlier or later than the expected instant.
101///
102/// Use [`interval_after_at_least`] when it's preferable not to tick
103/// before the expected instants.
104pub fn interval_at(start: Instant, period: Duration) -> Result<Interval, IntervalError> {
105    if period.is_zero() {
106        return Err(IntervalError);
107    }
108
109    Ok(Interval::new(start, period))
110}
111
112/// Creates a timer that emits an event once after the given delay.
113///
114/// See [`delay_for`] for details. The event is guaranteed to be emitted
115/// no sooner than the expected delay has elapsed.
116#[track_caller]
117pub fn delay_for_at_least(delay: Duration) -> OneshotAfter {
118    if delay.is_zero() {
119        // timer should fire now.
120        return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
121    }
122
123    OneshotAfter::new(Instant::now() + delay)
124}
125
126/// Creates a timer that emits an event once no sooner than the given time instant.
127///
128/// See [`at`] for details.
129#[track_caller]
130pub fn after(when: Instant) -> OneshotAfter {
131    if when <= Instant::now() {
132        // timer should fire now.
133        return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
134    }
135
136    OneshotAfter::new(when)
137}
138
139/// Creates a timer that emits events periodically, starting as soon as possible.
140///
141/// Returns an error if `period` is zero.
142///
143/// See [`interval`] for details. The events are guaranteed to be
144/// emitted no sooner than the expected instants.
145pub fn interval_at_least(period: Duration) -> Result<IntervalAfter, IntervalError> {
146    interval_after_at_least(Instant::now(), period)
147}
148
149/// Creates a timer that emits events periodically, starting after at least `delay`.
150///
151/// Returns an error if `period` is zero.
152///
153/// See [`interval_delayed_by`] for details. The events are guaranteed to be
154/// emitted no sooner than the expected instants.
155#[track_caller]
156pub fn interval_delayed_by_at_least(
157    delay: Duration,
158    period: Duration,
159) -> Result<IntervalAfter, IntervalError> {
160    interval_after_at_least(Instant::now() + delay, period)
161}
162
163/// Creates a timer that emits events periodically, starting at `start`.
164///
165/// See [`interval_at`] for details. The events are guaranteed to be
166/// emitted no sooner than the expected instants.
167#[track_caller]
168pub fn interval_after_at_least(
169    start: Instant,
170    period: Duration,
171) -> Result<IntervalAfter, IntervalError> {
172    if period.is_zero() {
173        return Err(IntervalError);
174    }
175
176    Ok(IntervalAfter::new(start, period))
177}
178
179/// A future that emits an event at the given time.
180///
181/// `Oneshot`s are futures that resolve when the given time is reached.
182///
183/// When throttling is activated (i.e. when using a non-`0` `wait`
184/// duration in `Context::acquire`), timer entries are assigned to
185/// the nearest time frame, meaning that the timer may fire
186/// `wait` / 2 ms earlier or later than the expected instant.
187#[derive(Debug)]
188pub struct Oneshot {
189    /// This timer's ID and last waker that polled it.
190    ///
191    /// When this field is set to `None`, this timer is not registered in the reactor.
192    id_and_waker: Option<(RegularTimerId, Waker)>,
193
194    /// The instant at which this timer fires.
195    when: Instant,
196}
197
198impl Oneshot {
199    fn new(when: Instant) -> Self {
200        Oneshot {
201            id_and_waker: None,
202            when,
203        }
204    }
205}
206
207impl Drop for Oneshot {
208    fn drop(&mut self) {
209        if let Some((id, _)) = self.id_and_waker.take() {
210            Reactor::with_mut(|reactor| {
211                reactor.remove_timer(self.when, id);
212            });
213        }
214    }
215}
216
217impl Future for Oneshot {
218    type Output = ();
219
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221        Reactor::with_mut(|reactor| {
222            if reactor.time_slice_end() >= self.when {
223                if let Some((id, _)) = self.id_and_waker.take() {
224                    // Deregister the timer from the reactor.
225                    reactor.remove_timer(self.when, id);
226                }
227
228                Poll::Ready(())
229            } else {
230                match &self.id_and_waker {
231                    None => {
232                        // Register the timer in the reactor.
233                        let id = reactor.insert_regular_timer(self.when, cx.waker());
234                        self.id_and_waker = Some((id, cx.waker().clone()));
235                    }
236                    Some((id, w)) if !w.will_wake(cx.waker()) => {
237                        // Deregister the timer from the reactor to remove the old waker.
238                        reactor.remove_timer(self.when, *id);
239
240                        // Register the timer in the reactor with the new waker.
241                        let id = reactor.insert_regular_timer(self.when, cx.waker());
242                        self.id_and_waker = Some((id, cx.waker().clone()));
243                    }
244                    Some(_) => {}
245                }
246
247                Poll::Pending
248            }
249        })
250    }
251}
252
253/// A future that emits an event at the given time.
254///
255/// `OneshotAfter`s are futures that always resolve after
256/// the given time is reached.
257#[derive(Debug)]
258pub struct OneshotAfter {
259    /// This timer's ID and last waker that polled it.
260    ///
261    /// When this field is set to `None`, this timer is not registered in the reactor.
262    id_and_waker: Option<(AfterTimerId, Waker)>,
263
264    /// The instant at which this timer fires.
265    when: Instant,
266}
267
268impl OneshotAfter {
269    fn new(when: Instant) -> Self {
270        OneshotAfter {
271            id_and_waker: None,
272            when,
273        }
274    }
275}
276
277impl Drop for OneshotAfter {
278    fn drop(&mut self) {
279        if let Some((id, _)) = self.id_and_waker.take() {
280            Reactor::with_mut(|reactor| {
281                reactor.remove_timer(self.when, id);
282            });
283        }
284    }
285}
286
287impl Future for OneshotAfter {
288    type Output = ();
289
290    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291        Reactor::with_mut(|reactor| {
292            if reactor.timers_check_instant() >= self.when {
293                if let Some((id, _)) = self.id_and_waker.take() {
294                    // Deregister the timer from the reactor.
295                    reactor.remove_timer(self.when, id);
296                }
297
298                Poll::Ready(())
299            } else {
300                match &self.id_and_waker {
301                    None => {
302                        // Register the timer in the reactor.
303                        let id = reactor.insert_after_timer(self.when, cx.waker());
304                        self.id_and_waker = Some((id, cx.waker().clone()));
305                    }
306                    Some((id, w)) if !w.will_wake(cx.waker()) => {
307                        // Deregister the timer from the reactor to remove the old waker.
308                        reactor.remove_timer(self.when, *id);
309
310                        // Register the timer in the reactor with the new waker.
311                        let id = reactor.insert_after_timer(self.when, cx.waker());
312                        self.id_and_waker = Some((id, cx.waker().clone()));
313                    }
314                    Some(_) => {}
315                }
316
317                Poll::Pending
318            }
319        })
320    }
321}
322
323/// A stream that emits timed events.
324///
325/// `Interval`s are streams that ticks periodically in the closest
326/// time slice.
327#[derive(Debug)]
328pub struct Interval {
329    /// This timer's ID and last waker that polled it.
330    ///
331    /// When this field is set to `None`, this timer is not registered in the reactor.
332    id_and_waker: Option<(RegularTimerId, Waker)>,
333
334    /// The next instant at which this timer should fire.
335    when: Instant,
336
337    /// The period.
338    period: Duration,
339}
340
341impl Interval {
342    fn new(start: Instant, period: Duration) -> Self {
343        Interval {
344            id_and_waker: None,
345            when: start,
346            period,
347        }
348    }
349}
350
351impl Drop for Interval {
352    fn drop(&mut self) {
353        if let Some((id, _)) = self.id_and_waker.take() {
354            Reactor::with_mut(|reactor| {
355                reactor.remove_timer(self.when, id);
356            });
357        }
358    }
359}
360
361impl Stream for Interval {
362    type Item = ();
363
364    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365        Reactor::with_mut(|reactor| {
366            let time_slice_end = reactor.time_slice_end();
367            if time_slice_end >= self.when {
368                if let Some((id, _)) = self.id_and_waker.take() {
369                    // Deregister the timer from the reactor.
370                    reactor.remove_timer(self.when, id);
371                }
372                // Compute the next tick making sure we are not so late
373                // that we would need to tick again right now.
374                let period = self.period;
375                while time_slice_end >= self.when {
376                    // This can't overflow in practical conditions.
377                    self.when += period;
378                }
379                // Register the timer in the reactor.
380                let id = reactor.insert_regular_timer(self.when, cx.waker());
381                self.id_and_waker = Some((id, cx.waker().clone()));
382
383                Poll::Ready(Some(()))
384            } else {
385                match &self.id_and_waker {
386                    None => {
387                        // Register the timer in the reactor.
388                        let id = reactor.insert_regular_timer(self.when, cx.waker());
389                        self.id_and_waker = Some((id, cx.waker().clone()));
390                    }
391                    Some((id, w)) if !w.will_wake(cx.waker()) => {
392                        // Deregister the timer from the reactor to remove the old waker.
393                        reactor.remove_timer(self.when, *id);
394
395                        // Register the timer in the reactor with the new waker.
396                        let id = reactor.insert_regular_timer(self.when, cx.waker());
397                        self.id_and_waker = Some((id, cx.waker().clone()));
398                    }
399                    Some(_) => {}
400                }
401
402                Poll::Pending
403            }
404        })
405    }
406}
407
408impl FusedStream for Interval {
409    fn is_terminated(&self) -> bool {
410        // Interval is "infinite" in practice
411        false
412    }
413}
414/// A stream that emits timed events.
415///
416/// `IntervalAfter`s are streams that ticks periodically. Ticks are
417/// guaranteed to fire no sooner than the expected instant.
418#[derive(Debug)]
419pub struct IntervalAfter {
420    /// This timer's ID and last waker that polled it.
421    ///
422    /// When this field is set to `None`, this timer is not registered in the reactor.
423    id_and_waker: Option<(AfterTimerId, Waker)>,
424
425    /// The next instant at which this timer should fire.
426    when: Instant,
427
428    /// The period.
429    period: Duration,
430}
431
432impl IntervalAfter {
433    fn new(start: Instant, period: Duration) -> Self {
434        IntervalAfter {
435            id_and_waker: None,
436            when: start,
437            period,
438        }
439    }
440}
441
442impl Drop for IntervalAfter {
443    fn drop(&mut self) {
444        if let Some((id, _)) = self.id_and_waker.take() {
445            Reactor::with_mut(|reactor| {
446                reactor.remove_timer(self.when, id);
447            });
448        }
449    }
450}
451
452impl Stream for IntervalAfter {
453    type Item = ();
454
455    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
456        Reactor::with_mut(|reactor| {
457            let timers_check_instant = reactor.timers_check_instant();
458            if timers_check_instant >= self.when {
459                if let Some((id, _)) = self.id_and_waker.take() {
460                    // Deregister the timer from the reactor.
461                    reactor.remove_timer(self.when, id);
462                }
463                // Compute the next tick making sure we are not so late
464                // that we would need to tick again right now.
465                let period = self.period;
466                while timers_check_instant >= self.when {
467                    // This can't overflow in practical conditions.
468                    self.when += period;
469                }
470                // Register the timer in the reactor.
471                let id = reactor.insert_after_timer(self.when, cx.waker());
472                self.id_and_waker = Some((id, cx.waker().clone()));
473
474                Poll::Ready(Some(()))
475            } else {
476                match &self.id_and_waker {
477                    None => {
478                        // Register the timer in the reactor.
479                        let id = reactor.insert_after_timer(self.when, cx.waker());
480                        self.id_and_waker = Some((id, cx.waker().clone()));
481                    }
482                    Some((id, w)) if !w.will_wake(cx.waker()) => {
483                        // Deregister the timer from the reactor to remove the old waker.
484                        reactor.remove_timer(self.when, *id);
485
486                        // Register the timer in the reactor with the new waker.
487                        let id = reactor.insert_after_timer(self.when, cx.waker());
488                        self.id_and_waker = Some((id, cx.waker().clone()));
489                    }
490                    Some(_) => {}
491                }
492
493                Poll::Pending
494            }
495        })
496    }
497}
498
499impl FusedStream for IntervalAfter {
500    fn is_terminated(&self) -> bool {
501        // IntervalAfter is "infinite" in practice
502        false
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use std::time::{Duration, Instant};
509
510    use crate::runtime::executor::scheduler;
511
512    const MAX_THROTTLING: Duration = Duration::from_millis(10);
513    const DELAY: Duration = Duration::from_millis(12);
514    const PERIOD: Duration = Duration::from_millis(15);
515
516    #[test]
517    fn delay_for_regular() {
518        gst::init().unwrap();
519
520        let handle = scheduler::Throttling::start("delay_for_regular", MAX_THROTTLING);
521
522        futures::executor::block_on(handle.spawn(async {
523            let start = Instant::now();
524            super::delay_for(DELAY).await;
525            // Due to throttling, timer may be fired earlier
526            assert!(start.elapsed() + MAX_THROTTLING / 2 >= DELAY);
527        }))
528        .unwrap();
529    }
530
531    #[test]
532    fn delay_for_at_least() {
533        gst::init().unwrap();
534
535        let handle = scheduler::Throttling::start("delay_for_at_least", MAX_THROTTLING);
536
537        futures::executor::block_on(handle.spawn(async {
538            let start = Instant::now();
539            super::delay_for_at_least(DELAY).await;
540            // Never returns earlier than DELAY
541            assert!(start.elapsed() >= DELAY);
542        }))
543        .unwrap();
544    }
545
546    #[test]
547    fn interval_regular() {
548        use futures::prelude::*;
549
550        gst::init().unwrap();
551
552        let handle = scheduler::Throttling::start("interval_regular", MAX_THROTTLING);
553
554        let join_handle = handle.spawn(async move {
555            let mut acc = Duration::ZERO;
556
557            let start = Instant::now();
558            let mut interval = super::interval(PERIOD).unwrap();
559
560            interval.next().await.unwrap();
561            assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
562
563            // Due to throttling, intervals may tick earlier.
564            for _ in 0..10 {
565                interval.next().await.unwrap();
566                acc += PERIOD;
567                assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
568            }
569        });
570
571        futures::executor::block_on(join_handle).unwrap();
572    }
573
574    #[test]
575    fn interval_after_at_least() {
576        use futures::prelude::*;
577
578        gst::init().unwrap();
579
580        let handle = scheduler::Throttling::start("interval_after", MAX_THROTTLING);
581
582        let join_handle = handle.spawn(async move {
583            let mut acc = DELAY;
584
585            let start = Instant::now();
586            let mut interval = super::interval_after_at_least(start + DELAY, PERIOD).unwrap();
587
588            interval.next().await.unwrap();
589            assert!(start.elapsed() >= acc);
590
591            for _ in 1..10 {
592                interval.next().await.unwrap();
593                acc += PERIOD;
594                assert!(start.elapsed() >= acc);
595            }
596        });
597
598        futures::executor::block_on(join_handle).unwrap();
599    }
600}