contained_core/delay.rs
1/*
2 Appellation: delay <module>
3 Contrib: FL03 <jo3mccain@icloud.com>
4 Description: ... Summary ...
5*/
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll, Waker};
8use std::time::{Duration, Instant};
9use std::{future::Future, pin::Pin, thread};
10use tokio::sync::Notify;
11
12pub async fn delay(dur: Duration) {
13 let when = Instant::now() + dur;
14 let notify = Arc::new(Notify::new());
15 let notify2 = notify.clone();
16
17 thread::spawn(move || {
18 let now = Instant::now();
19
20 if now < when {
21 thread::sleep(when - now);
22 }
23
24 notify2.notify_one();
25 });
26
27 notify.notified().await;
28}
29
30/// The `Delay` future represents an asynchronous sleep.
31#[derive(Clone, Debug)]
32pub struct Delay {
33 // This is Some when we have spawned a thread, and None otherwise.
34 waker: Option<Arc<Mutex<Waker>>>,
35 // The `Instant` at which the delay will complete.
36 when: Instant,
37}
38
39impl Delay {
40 pub fn new(when: Instant) -> Self {
41 Self { waker: None, when }
42 }
43 /// Adjusts the delay to be `duration` earlier.
44 pub fn decrease(&mut self, duration: Duration) {
45 self.when -= duration;
46 }
47 /// Adjusts the delay to be `duration` later.
48 pub fn increase(&mut self, duration: Duration) {
49 self.when += duration;
50 }
51 /// Returns the `Waker` that will be notified when the delay completes.
52 pub fn waker(&self) -> Option<Arc<Mutex<Waker>>> {
53 self.waker.clone()
54 }
55 /// Returns the `Instant` at which the delay will complete.
56 pub fn when(&self) -> Instant {
57 self.when
58 }
59}
60
61impl Default for Delay {
62 fn default() -> Self {
63 Self::new(Instant::now())
64 }
65}
66
67impl Future for Delay {
68 type Output = ();
69
70 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
71 // First, if this is the first time the future is called, spawn the
72 // timer thread. If the timer thread is already running, ensure the
73 // stored `Waker` matches the current task's waker.
74 if let Some(waker) = &self.waker {
75 let mut waker = waker.lock().unwrap();
76
77 // Check if the stored waker matches the current task's waker.
78 // This is necessary as the `Delay` future instance may move to
79 // a different task between calls to `poll`. If this happens, the
80 // waker contained by the given `Context` will differ and we
81 // must update our stored waker to reflect this change.
82 if !waker.will_wake(cx.waker()) {
83 *waker = cx.waker().clone();
84 }
85 } else {
86 let when = self.when;
87 let waker = Arc::new(Mutex::new(cx.waker().clone()));
88 self.waker = Some(waker.clone());
89
90 // This is the first time `poll` is called, spawn the timer thread.
91 thread::spawn(move || {
92 let now = Instant::now();
93
94 if now < when {
95 thread::sleep(when - now);
96 }
97
98 // The duration has elapsed. Notify the caller by invoking
99 // the waker.
100 let waker = waker.lock().unwrap();
101 waker.wake_by_ref();
102 });
103 }
104
105 // Once the waker is stored and the timer thread is started, it is
106 // time to check if the delay has completed. This is done by
107 // checking the current instant. If the duration has elapsed, then
108 // the future has completed and `Poll::Ready` is returned.
109 if Instant::now() >= self.when {
110 Poll::Ready(())
111 } else {
112 // The duration has not elapsed, the future has not completed so
113 // return `Poll::Pending`.
114 //
115 // The `Future` trait contract requires that when `Pending` is
116 // returned, the future ensures that the given waker is signalled
117 // once the future should be polled again. In our case, by
118 // returning `Pending` here, we are promising that we will
119 // invoke the given waker included in the `Context` argument
120 // once the requested duration has elapsed. We ensure this by
121 // spawning the timer thread above.
122 //
123 // If we forget to invoke the waker, the task will hang
124 // indefinitely.
125 Poll::Pending
126 }
127 }
128}
129
130impl PartialEq for Delay {
131 fn eq(&self, other: &Self) -> bool {
132 self.when == other.when
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[tokio::test]
141 async fn test_delay() {
142 assert!(Delay::default().waker.is_none());
143
144 let start = Instant::now();
145 let dur = Duration::new(1, 0);
146
147 let mut delay = Delay::new(start);
148 delay.increase(dur);
149 assert_eq!(delay.when, start + dur);
150 }
151}