tl_async_runtime/
timers.rs

1use std::{
2    cmp::Reverse,
3    collections::BinaryHeap,
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9use futures::Future;
10use parking_lot::{Mutex, MutexGuard};
11use pin_project::pin_project;
12
13use crate::{
14    driver::{executor_context, task_context},
15    TaskId,
16};
17
18type TimerHeap = BinaryHeap<(Reverse<Instant>, TaskId)>;
19#[derive(Default)]
20pub(crate) struct Queue(Mutex<TimerHeap>);
21impl Queue {
22    pub fn insert(&self, instant: Instant, task: TaskId) {
23        self.0.lock().push((Reverse(instant), task));
24    }
25}
26
27impl<'a> IntoIterator for &'a Queue {
28    type Item = TaskId;
29    type IntoIter = QueueIter<'a>;
30
31    fn into_iter(self) -> Self::IntoIter {
32        QueueIter(self.0.lock(), Instant::now())
33    }
34}
35
36pub(crate) struct QueueIter<'a>(MutexGuard<'a, TimerHeap>, Instant);
37impl<'a> Iterator for QueueIter<'a> {
38    type Item = TaskId;
39
40    fn next(&mut self) -> Option<Self::Item> {
41        let (Reverse(time), task) = self.0.pop()?;
42        if time > self.1 {
43            self.0.push((Reverse(time), task));
44            None
45        } else {
46            Some(task)
47        }
48    }
49}
50
51/// Future for sleeping fixed amounts of time.
52/// Does not block the thread
53#[pin_project]
54pub struct Sleep {
55    instant: Instant,
56}
57
58impl Future for Sleep {
59    type Output = ();
60
61    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
62        let instant = *self.project().instant;
63        // if the future is not yet ready
64        if instant > Instant::now() {
65            task_context(|id| {
66                executor_context(|exec| {
67                    // register the timer on the executor
68                    exec.timers.insert(instant, id);
69                    Poll::Pending
70                })
71            })
72        } else {
73            Poll::Ready(())
74        }
75    }
76}
77
78impl Sleep {
79    /// sleep until a specific point in time
80    pub fn until(instant: Instant) -> Sleep {
81        Self { instant }
82    }
83    /// sleep for a specific duration of time
84    pub fn duration(duration: Duration) -> Sleep {
85        Sleep::until(Instant::now() + duration)
86    }
87}