gstthreadshare/runtime/executor/
timer.rs1use futures::stream::{FusedStream, Stream};
8
9use std::error::Error;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::task::{Context, Poll, Waker};
14use std::time::{Duration, Instant};
15
16use super::reactor::{AfterTimerId, Reactor, RegularTimerId};
17
18#[derive(Debug)]
19pub struct IntervalError;
20
21impl Error for IntervalError {}
22
23impl fmt::Display for IntervalError {
24 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
25 f.write_str("Interval period can't be null")
26 }
27}
28
29pub fn delay_for(delay: Duration) -> Oneshot {
39 if delay <= Reactor::with(|r| r.half_max_throttling()) {
40 return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
42 }
43
44 Oneshot::new(Instant::now() + delay)
45}
46
47pub fn at(when: Instant) -> Oneshot {
57 if when <= Instant::now() {
58 return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
60 }
61
62 Oneshot::new(when)
63}
64
65pub fn interval(period: Duration) -> Result<Interval, IntervalError> {
77 interval_at(Instant::now(), period)
78}
79
80pub fn interval_delayed_by(delay: Duration, period: Duration) -> Result<Interval, IntervalError> {
92 interval_at(Instant::now() + delay, period)
93}
94
95pub fn interval_at(start: Instant, period: Duration) -> Result<Interval, IntervalError> {
105 if period.is_zero() {
106 return Err(IntervalError);
107 }
108
109 Ok(Interval::new(start, period))
110}
111
112#[track_caller]
117pub fn delay_for_at_least(delay: Duration) -> OneshotAfter {
118 if delay.is_zero() {
119 return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
121 }
122
123 OneshotAfter::new(Instant::now() + delay)
124}
125
126#[track_caller]
130pub fn after(when: Instant) -> OneshotAfter {
131 if when <= Instant::now() {
132 return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
134 }
135
136 OneshotAfter::new(when)
137}
138
139pub fn interval_at_least(period: Duration) -> Result<IntervalAfter, IntervalError> {
146 interval_after_at_least(Instant::now(), period)
147}
148
149#[track_caller]
156pub fn interval_delayed_by_at_least(
157 delay: Duration,
158 period: Duration,
159) -> Result<IntervalAfter, IntervalError> {
160 interval_after_at_least(Instant::now() + delay, period)
161}
162
163#[track_caller]
168pub fn interval_after_at_least(
169 start: Instant,
170 period: Duration,
171) -> Result<IntervalAfter, IntervalError> {
172 if period.is_zero() {
173 return Err(IntervalError);
174 }
175
176 Ok(IntervalAfter::new(start, period))
177}
178
179#[derive(Debug)]
188pub struct Oneshot {
189 id_and_waker: Option<(RegularTimerId, Waker)>,
193
194 when: Instant,
196}
197
198impl Oneshot {
199 fn new(when: Instant) -> Self {
200 Oneshot {
201 id_and_waker: None,
202 when,
203 }
204 }
205}
206
207impl Drop for Oneshot {
208 fn drop(&mut self) {
209 if let Some((id, _)) = self.id_and_waker.take() {
210 Reactor::with_mut(|reactor| {
211 reactor.remove_timer(self.when, id);
212 });
213 }
214 }
215}
216
217impl Future for Oneshot {
218 type Output = ();
219
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221 Reactor::with_mut(|reactor| {
222 if reactor.time_slice_end() >= self.when {
223 if let Some((id, _)) = self.id_and_waker.take() {
224 reactor.remove_timer(self.when, id);
226 }
227
228 Poll::Ready(())
229 } else {
230 match &self.id_and_waker {
231 None => {
232 let id = reactor.insert_regular_timer(self.when, cx.waker());
234 self.id_and_waker = Some((id, cx.waker().clone()));
235 }
236 Some((id, w)) if !w.will_wake(cx.waker()) => {
237 reactor.remove_timer(self.when, *id);
239
240 let id = reactor.insert_regular_timer(self.when, cx.waker());
242 self.id_and_waker = Some((id, cx.waker().clone()));
243 }
244 Some(_) => {}
245 }
246
247 Poll::Pending
248 }
249 })
250 }
251}
252
253#[derive(Debug)]
258pub struct OneshotAfter {
259 id_and_waker: Option<(AfterTimerId, Waker)>,
263
264 when: Instant,
266}
267
268impl OneshotAfter {
269 fn new(when: Instant) -> Self {
270 OneshotAfter {
271 id_and_waker: None,
272 when,
273 }
274 }
275}
276
277impl Drop for OneshotAfter {
278 fn drop(&mut self) {
279 if let Some((id, _)) = self.id_and_waker.take() {
280 Reactor::with_mut(|reactor| {
281 reactor.remove_timer(self.when, id);
282 });
283 }
284 }
285}
286
287impl Future for OneshotAfter {
288 type Output = ();
289
290 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291 Reactor::with_mut(|reactor| {
292 if reactor.timers_check_instant() >= self.when {
293 if let Some((id, _)) = self.id_and_waker.take() {
294 reactor.remove_timer(self.when, id);
296 }
297
298 Poll::Ready(())
299 } else {
300 match &self.id_and_waker {
301 None => {
302 let id = reactor.insert_after_timer(self.when, cx.waker());
304 self.id_and_waker = Some((id, cx.waker().clone()));
305 }
306 Some((id, w)) if !w.will_wake(cx.waker()) => {
307 reactor.remove_timer(self.when, *id);
309
310 let id = reactor.insert_after_timer(self.when, cx.waker());
312 self.id_and_waker = Some((id, cx.waker().clone()));
313 }
314 Some(_) => {}
315 }
316
317 Poll::Pending
318 }
319 })
320 }
321}
322
323#[derive(Debug)]
328pub struct Interval {
329 id_and_waker: Option<(RegularTimerId, Waker)>,
333
334 when: Instant,
336
337 period: Duration,
339}
340
341impl Interval {
342 fn new(start: Instant, period: Duration) -> Self {
343 Interval {
344 id_and_waker: None,
345 when: start,
346 period,
347 }
348 }
349}
350
351impl Drop for Interval {
352 fn drop(&mut self) {
353 if let Some((id, _)) = self.id_and_waker.take() {
354 Reactor::with_mut(|reactor| {
355 reactor.remove_timer(self.when, id);
356 });
357 }
358 }
359}
360
361impl Stream for Interval {
362 type Item = ();
363
364 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365 Reactor::with_mut(|reactor| {
366 let time_slice_end = reactor.time_slice_end();
367 if time_slice_end >= self.when {
368 if let Some((id, _)) = self.id_and_waker.take() {
369 reactor.remove_timer(self.when, id);
371 }
372 let period = self.period;
375 while time_slice_end >= self.when {
376 self.when += period;
378 }
379 let id = reactor.insert_regular_timer(self.when, cx.waker());
381 self.id_and_waker = Some((id, cx.waker().clone()));
382
383 Poll::Ready(Some(()))
384 } else {
385 match &self.id_and_waker {
386 None => {
387 let id = reactor.insert_regular_timer(self.when, cx.waker());
389 self.id_and_waker = Some((id, cx.waker().clone()));
390 }
391 Some((id, w)) if !w.will_wake(cx.waker()) => {
392 reactor.remove_timer(self.when, *id);
394
395 let id = reactor.insert_regular_timer(self.when, cx.waker());
397 self.id_and_waker = Some((id, cx.waker().clone()));
398 }
399 Some(_) => {}
400 }
401
402 Poll::Pending
403 }
404 })
405 }
406}
407
408impl FusedStream for Interval {
409 fn is_terminated(&self) -> bool {
410 false
412 }
413}
414#[derive(Debug)]
419pub struct IntervalAfter {
420 id_and_waker: Option<(AfterTimerId, Waker)>,
424
425 when: Instant,
427
428 period: Duration,
430}
431
432impl IntervalAfter {
433 fn new(start: Instant, period: Duration) -> Self {
434 IntervalAfter {
435 id_and_waker: None,
436 when: start,
437 period,
438 }
439 }
440}
441
442impl Drop for IntervalAfter {
443 fn drop(&mut self) {
444 if let Some((id, _)) = self.id_and_waker.take() {
445 Reactor::with_mut(|reactor| {
446 reactor.remove_timer(self.when, id);
447 });
448 }
449 }
450}
451
452impl Stream for IntervalAfter {
453 type Item = ();
454
455 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
456 Reactor::with_mut(|reactor| {
457 let timers_check_instant = reactor.timers_check_instant();
458 if timers_check_instant >= self.when {
459 if let Some((id, _)) = self.id_and_waker.take() {
460 reactor.remove_timer(self.when, id);
462 }
463 let period = self.period;
466 while timers_check_instant >= self.when {
467 self.when += period;
469 }
470 let id = reactor.insert_after_timer(self.when, cx.waker());
472 self.id_and_waker = Some((id, cx.waker().clone()));
473
474 Poll::Ready(Some(()))
475 } else {
476 match &self.id_and_waker {
477 None => {
478 let id = reactor.insert_after_timer(self.when, cx.waker());
480 self.id_and_waker = Some((id, cx.waker().clone()));
481 }
482 Some((id, w)) if !w.will_wake(cx.waker()) => {
483 reactor.remove_timer(self.when, *id);
485
486 let id = reactor.insert_after_timer(self.when, cx.waker());
488 self.id_and_waker = Some((id, cx.waker().clone()));
489 }
490 Some(_) => {}
491 }
492
493 Poll::Pending
494 }
495 })
496 }
497}
498
499impl FusedStream for IntervalAfter {
500 fn is_terminated(&self) -> bool {
501 false
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use std::time::{Duration, Instant};
509
510 use crate::runtime::executor::scheduler;
511
512 const MAX_THROTTLING: Duration = Duration::from_millis(10);
513 const DELAY: Duration = Duration::from_millis(12);
514 const PERIOD: Duration = Duration::from_millis(15);
515
516 #[test]
517 fn delay_for_regular() {
518 gst::init().unwrap();
519
520 let handle = scheduler::Throttling::start("delay_for_regular", MAX_THROTTLING);
521
522 futures::executor::block_on(handle.spawn(async {
523 let start = Instant::now();
524 super::delay_for(DELAY).await;
525 assert!(start.elapsed() + MAX_THROTTLING / 2 >= DELAY);
527 }))
528 .unwrap();
529 }
530
531 #[test]
532 fn delay_for_at_least() {
533 gst::init().unwrap();
534
535 let handle = scheduler::Throttling::start("delay_for_at_least", MAX_THROTTLING);
536
537 futures::executor::block_on(handle.spawn(async {
538 let start = Instant::now();
539 super::delay_for_at_least(DELAY).await;
540 assert!(start.elapsed() >= DELAY);
542 }))
543 .unwrap();
544 }
545
546 #[test]
547 fn interval_regular() {
548 use futures::prelude::*;
549
550 gst::init().unwrap();
551
552 let handle = scheduler::Throttling::start("interval_regular", MAX_THROTTLING);
553
554 let join_handle = handle.spawn(async move {
555 let mut acc = Duration::ZERO;
556
557 let start = Instant::now();
558 let mut interval = super::interval(PERIOD).unwrap();
559
560 interval.next().await.unwrap();
561 assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
562
563 for _ in 0..10 {
565 interval.next().await.unwrap();
566 acc += PERIOD;
567 assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
568 }
569 });
570
571 futures::executor::block_on(join_handle).unwrap();
572 }
573
574 #[test]
575 fn interval_after_at_least() {
576 use futures::prelude::*;
577
578 gst::init().unwrap();
579
580 let handle = scheduler::Throttling::start("interval_after", MAX_THROTTLING);
581
582 let join_handle = handle.spawn(async move {
583 let mut acc = DELAY;
584
585 let start = Instant::now();
586 let mut interval = super::interval_after_at_least(start + DELAY, PERIOD).unwrap();
587
588 interval.next().await.unwrap();
589 assert!(start.elapsed() >= acc);
590
591 for _ in 1..10 {
592 interval.next().await.unwrap();
593 acc += PERIOD;
594 assert!(start.elapsed() >= acc);
595 }
596 });
597
598 futures::executor::block_on(join_handle).unwrap();
599 }
600}