mvsync/
timer.rs

1use std::sync::{Arc, Mutex, Condvar, Once};
2use std::time::{Duration, Instant};
3use std::thread;
4use std::task::{Waker, Context, Poll};
5use std::pin::Pin;
6use std::collections::BinaryHeap;
7use std::future::Future;
8use mvutils::lazy;
9use mvutils::utils::Recover;
10
11pub struct Sleep {
12    duration: Duration,
13    when: Instant,
14    started: bool
15}
16
17lazy! {
18    static QUEUE: Arc<Mutex<BinaryHeap<TimerEntry>>> = Arc::new(Mutex::new(BinaryHeap::new()));
19    static SIGNAL: Condvar = Condvar::new();
20    static INIT: Once = Once::new();
21}
22
23impl Sleep {
24    pub(crate) fn new(duration: Duration) -> Self {
25        INIT.call_once(start_timer_thread);
26        Sleep {
27            duration,
28            when: Instant::now(),
29            started: false
30        }
31    }
32}
33
34impl Future for Sleep {
35    type Output = ();
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        if !self.started {
39            let this = self.get_mut();
40            this.when = Instant::now() + this.duration;
41            this.started = true;
42            let mut queue = QUEUE.lock().recover();
43            queue.push(TimerEntry {
44                when: this.when,
45                waker: cx.waker().clone(),
46            });
47            drop(queue);
48            SIGNAL.notify_one();
49            Poll::Pending
50        }
51        else if Instant::now() >= self.when {
52            Poll::Ready(())
53        } else {
54            Poll::Pending
55        }
56    }
57}
58
59struct TimerEntry {
60    when: Instant,
61    waker: Waker,
62}
63
64impl Ord for TimerEntry {
65    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
66        self.when.cmp(&other.when).reverse()
67    }
68}
69
70impl PartialOrd for TimerEntry {
71    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
72        Some(self.cmp(other))
73    }
74}
75
76impl PartialEq for TimerEntry {
77    fn eq(&self, other: &Self) -> bool {
78        self.when == other.when
79    }
80}
81
82impl Eq for TimerEntry {}
83
84fn start_timer_thread() {
85    thread::spawn(|| {
86        loop {
87            let next = QUEUE.lock().recover().peek().map(|e| e.when);
88            match next {
89                Some(t) => {
90                    let now = Instant::now();
91                    if now >= t {
92                        let next = QUEUE.lock().recover().pop().unwrap();
93                        next.waker.wake();
94                    }
95                    else {
96                        let wait_duration = if now > t {
97                            Duration::from_secs(0)
98                        } else {
99                            t.duration_since(now)
100                        };
101                        let (_queue, _) = SIGNAL.wait_timeout(QUEUE.lock().recover(), wait_duration).recover();
102                    }
103                }
104                None => {
105                    let _queue = SIGNAL.wait(QUEUE.lock().recover()).recover();
106                }
107            }
108        }
109    });
110}