1use std::sync::{Arc, Mutex, Condvar, Once};
2use std::time::{Duration, Instant};
3use std::thread;
4use std::task::{Waker, Context, Poll};
5use std::pin::Pin;
6use std::collections::BinaryHeap;
7use std::future::Future;
8use mvutils::lazy;
9use mvutils::utils::Recover;
10
11pub struct Sleep {
12 duration: Duration,
13 when: Instant,
14 started: bool
15}
16
17lazy! {
18 static QUEUE: Arc<Mutex<BinaryHeap<TimerEntry>>> = Arc::new(Mutex::new(BinaryHeap::new()));
19 static SIGNAL: Condvar = Condvar::new();
20 static INIT: Once = Once::new();
21}
22
23impl Sleep {
24 pub(crate) fn new(duration: Duration) -> Self {
25 INIT.call_once(start_timer_thread);
26 Sleep {
27 duration,
28 when: Instant::now(),
29 started: false
30 }
31 }
32}
33
34impl Future for Sleep {
35 type Output = ();
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 if !self.started {
39 let this = self.get_mut();
40 this.when = Instant::now() + this.duration;
41 this.started = true;
42 let mut queue = QUEUE.lock().recover();
43 queue.push(TimerEntry {
44 when: this.when,
45 waker: cx.waker().clone(),
46 });
47 drop(queue);
48 SIGNAL.notify_one();
49 Poll::Pending
50 }
51 else if Instant::now() >= self.when {
52 Poll::Ready(())
53 } else {
54 Poll::Pending
55 }
56 }
57}
58
59struct TimerEntry {
60 when: Instant,
61 waker: Waker,
62}
63
64impl Ord for TimerEntry {
65 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
66 self.when.cmp(&other.when).reverse()
67 }
68}
69
70impl PartialOrd for TimerEntry {
71 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
72 Some(self.cmp(other))
73 }
74}
75
76impl PartialEq for TimerEntry {
77 fn eq(&self, other: &Self) -> bool {
78 self.when == other.when
79 }
80}
81
82impl Eq for TimerEntry {}
83
84fn start_timer_thread() {
85 thread::spawn(|| {
86 loop {
87 let next = QUEUE.lock().recover().peek().map(|e| e.when);
88 match next {
89 Some(t) => {
90 let now = Instant::now();
91 if now >= t {
92 let next = QUEUE.lock().recover().pop().unwrap();
93 next.waker.wake();
94 }
95 else {
96 let wait_duration = if now > t {
97 Duration::from_secs(0)
98 } else {
99 t.duration_since(now)
100 };
101 let (_queue, _) = SIGNAL.wait_timeout(QUEUE.lock().recover(), wait_duration).recover();
102 }
103 }
104 None => {
105 let _queue = SIGNAL.wait(QUEUE.lock().recover()).recover();
106 }
107 }
108 }
109 });
110}