contained_core/
delay.rs

1/*
2    Appellation: delay <module>
3    Contrib: FL03 <jo3mccain@icloud.com>
4    Description: ... Summary ...
5*/
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, Waker};
8use std::time::{Duration, Instant};
9use std::{future::Future, pin::Pin, thread};
10use tokio::sync::Notify;
11
12pub async fn delay(dur: Duration) {
13    let when = Instant::now() + dur;
14    let notify = Arc::new(Notify::new());
15    let notify2 = notify.clone();
16
17    thread::spawn(move || {
18        let now = Instant::now();
19
20        if now < when {
21            thread::sleep(when - now);
22        }
23
24        notify2.notify_one();
25    });
26
27    notify.notified().await;
28}
29
30/// The `Delay` future represents an asynchronous sleep.
31#[derive(Clone, Debug)]
32pub struct Delay {
33    // This is Some when we have spawned a thread, and None otherwise.
34    waker: Option<Arc<Mutex<Waker>>>,
35    // The `Instant` at which the delay will complete.
36    when: Instant,
37}
38
39impl Delay {
40    pub fn new(when: Instant) -> Self {
41        Self { waker: None, when }
42    }
43    /// Adjusts the delay to be `duration` earlier.
44    pub fn decrease(&mut self, duration: Duration) {
45        self.when -= duration;
46    }
47    /// Adjusts the delay to be `duration` later.
48    pub fn increase(&mut self, duration: Duration) {
49        self.when += duration;
50    }
51    /// Returns the `Waker` that will be notified when the delay completes.
52    pub fn waker(&self) -> Option<Arc<Mutex<Waker>>> {
53        self.waker.clone()
54    }
55    /// Returns the `Instant` at which the delay will complete.
56    pub fn when(&self) -> Instant {
57        self.when
58    }
59}
60
61impl Default for Delay {
62    fn default() -> Self {
63        Self::new(Instant::now())
64    }
65}
66
67impl Future for Delay {
68    type Output = ();
69
70    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
71        // First, if this is the first time the future is called, spawn the
72        // timer thread. If the timer thread is already running, ensure the
73        // stored `Waker` matches the current task's waker.
74        if let Some(waker) = &self.waker {
75            let mut waker = waker.lock().unwrap();
76
77            // Check if the stored waker matches the current task's waker.
78            // This is necessary as the `Delay` future instance may move to
79            // a different task between calls to `poll`. If this happens, the
80            // waker contained by the given `Context` will differ and we
81            // must update our stored waker to reflect this change.
82            if !waker.will_wake(cx.waker()) {
83                *waker = cx.waker().clone();
84            }
85        } else {
86            let when = self.when;
87            let waker = Arc::new(Mutex::new(cx.waker().clone()));
88            self.waker = Some(waker.clone());
89
90            // This is the first time `poll` is called, spawn the timer thread.
91            thread::spawn(move || {
92                let now = Instant::now();
93
94                if now < when {
95                    thread::sleep(when - now);
96                }
97
98                // The duration has elapsed. Notify the caller by invoking
99                // the waker.
100                let waker = waker.lock().unwrap();
101                waker.wake_by_ref();
102            });
103        }
104
105        // Once the waker is stored and the timer thread is started, it is
106        // time to check if the delay has completed. This is done by
107        // checking the current instant. If the duration has elapsed, then
108        // the future has completed and `Poll::Ready` is returned.
109        if Instant::now() >= self.when {
110            Poll::Ready(())
111        } else {
112            // The duration has not elapsed, the future has not completed so
113            // return `Poll::Pending`.
114            //
115            // The `Future` trait contract requires that when `Pending` is
116            // returned, the future ensures that the given waker is signalled
117            // once the future should be polled again. In our case, by
118            // returning `Pending` here, we are promising that we will
119            // invoke the given waker included in the `Context` argument
120            // once the requested duration has elapsed. We ensure this by
121            // spawning the timer thread above.
122            //
123            // If we forget to invoke the waker, the task will hang
124            // indefinitely.
125            Poll::Pending
126        }
127    }
128}
129
130impl PartialEq for Delay {
131    fn eq(&self, other: &Self) -> bool {
132        self.when == other.when
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[tokio::test]
141    async fn test_delay() {
142        assert!(Delay::default().waker.is_none());
143
144        let start = Instant::now();
145        let dur = Duration::new(1, 0);
146
147        let mut delay = Delay::new(start);
148        delay.increase(dur);
149        assert_eq!(delay.when, start + dur);
150    }
151}