tl_async_runtime/
timers.rs1use std::{
2 cmp::Reverse,
3 collections::BinaryHeap,
4 pin::Pin,
5 task::{Context, Poll},
6 time::{Duration, Instant},
7};
8
9use futures::Future;
10use parking_lot::{Mutex, MutexGuard};
11use pin_project::pin_project;
12
13use crate::{
14 driver::{executor_context, task_context},
15 TaskId,
16};
17
18type TimerHeap = BinaryHeap<(Reverse<Instant>, TaskId)>;
19#[derive(Default)]
20pub(crate) struct Queue(Mutex<TimerHeap>);
21impl Queue {
22 pub fn insert(&self, instant: Instant, task: TaskId) {
23 self.0.lock().push((Reverse(instant), task));
24 }
25}
26
27impl<'a> IntoIterator for &'a Queue {
28 type Item = TaskId;
29 type IntoIter = QueueIter<'a>;
30
31 fn into_iter(self) -> Self::IntoIter {
32 QueueIter(self.0.lock(), Instant::now())
33 }
34}
35
36pub(crate) struct QueueIter<'a>(MutexGuard<'a, TimerHeap>, Instant);
37impl<'a> Iterator for QueueIter<'a> {
38 type Item = TaskId;
39
40 fn next(&mut self) -> Option<Self::Item> {
41 let (Reverse(time), task) = self.0.pop()?;
42 if time > self.1 {
43 self.0.push((Reverse(time), task));
44 None
45 } else {
46 Some(task)
47 }
48 }
49}
50
51#[pin_project]
54pub struct Sleep {
55 instant: Instant,
56}
57
58impl Future for Sleep {
59 type Output = ();
60
61 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
62 let instant = *self.project().instant;
63 if instant > Instant::now() {
65 task_context(|id| {
66 executor_context(|exec| {
67 exec.timers.insert(instant, id);
69 Poll::Pending
70 })
71 })
72 } else {
73 Poll::Ready(())
74 }
75 }
76}
77
78impl Sleep {
79 pub fn until(instant: Instant) -> Sleep {
81 Self { instant }
82 }
83 pub fn duration(duration: Duration) -> Sleep {
85 Sleep::until(Instant::now() + duration)
86 }
87}