1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::sync::{Arc, Mutex, Condvar, Once};
use std::time::{Duration, Instant};
use std::thread;
use std::task::{Waker, Context, Poll};
use std::pin::Pin;
use std::collections::BinaryHeap;
use std::future::Future;
use mvutils::lazy;
use mvutils::once::Lazy;
use mvutils::utils::Recover;

pub struct Sleep {
    duration: Duration,
    when: Instant,
    started: bool
}

lazy! {
    static QUEUE: Arc<Mutex<BinaryHeap<TimerEntry>>> = Arc::new(Mutex::new(BinaryHeap::new()));
    static SIGNAL: Condvar = Condvar::new();
    static INIT: Once = Once::new();
}

impl Sleep {
    pub(crate) fn new(duration: Duration) -> Self {
        INIT.call_once(start_timer_thread);
        Sleep {
            duration,
            when: Instant::now(),
            started: false
        }
    }
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if !self.started {
            let this = self.get_mut();
            this.when = Instant::now() + this.duration;
            this.started = true;
            let mut queue = QUEUE.lock().recover();
            queue.push(TimerEntry {
                when: this.when,
                waker: cx.waker().clone(),
            });
            drop(queue);
            SIGNAL.notify_one();
            Poll::Pending
        }
        else if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

struct TimerEntry {
    when: Instant,
    waker: Waker,
}

impl Ord for TimerEntry {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.when.cmp(&other.when).reverse()
    }
}

impl PartialOrd for TimerEntry {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for TimerEntry {
    fn eq(&self, other: &Self) -> bool {
        self.when == other.when
    }
}

impl Eq for TimerEntry {}

fn start_timer_thread() {
    thread::spawn(|| {
        loop {
            let next = QUEUE.lock().recover().peek().map(|e| e.when);
            match next {
                Some(t) => {
                    let now = Instant::now();
                    if now >= t {
                        let next = QUEUE.lock().recover().pop().unwrap();
                        next.waker.wake();
                    }
                    else {
                        let wait_duration = if now > t {
                            Duration::from_secs(0)
                        } else {
                            t.duration_since(now)
                        };
                        let (_queue, _) = SIGNAL.wait_timeout(QUEUE.lock().recover(), wait_duration).recover();
                    }
                }
                None => {
                    let _queue = SIGNAL.wait(QUEUE.lock().recover()).recover();
                }
            }
        }
    });
}