agnostic_lite/time/
delay.rs

1use core::{
2  future::Future,
3  pin::Pin,
4  sync::atomic::{AtomicBool, Ordering},
5  task::{Context, Poll},
6  time::Duration,
7};
8
9use super::{AsyncLocalSleep, AsyncLocalSleepExt};
10
11/// Delay is aborted
12#[derive(Debug, Clone, Copy)]
13pub struct Aborted;
14
15impl core::fmt::Display for Aborted {
16  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
17    write!(f, "delay aborted")
18  }
19}
20
21impl core::error::Error for Aborted {}
22
23/// Simlilar to Go's `time.AfterFunc`, but does not spawn a new thread.
24///
25/// If you want the future to run in its own thread, you should use
26/// [`RuntimeLite::spawn_after`](crate::RuntimeLite::spawn_after) instead.
27pub trait AsyncDelay<F>: Future<Output = Result<F::Output, Aborted>> + Send
28where
29  F: Future + Send,
30{
31  /// The instant type
32  type Instant: super::Instant;
33
34  /// Abort the delay, if future has not yet completed, then it will never be polled again.
35  fn abort(&self);
36
37  /// Cancel the delay, running the future immediately
38  fn cancel(&self);
39
40  /// Reset the delay to a new duration
41  fn reset(self: Pin<&mut Self>, dur: Duration);
42
43  /// Resets the delay to a new instant
44  fn reset_at(self: Pin<&mut Self>, at: Self::Instant);
45}
46
47/// Extension trait for [`AsyncLocalDelay`]
48pub trait AsyncDelayExt<F>: AsyncDelay<F>
49where
50  F: Future + Send,
51{
52  /// Create a new delay, the future will be polled after the duration has elapsed
53  fn delay(dur: Duration, fut: F) -> Self;
54
55  /// Create a new delay, the future will be polled after the instant has elapsed
56  fn delay_at(at: Self::Instant, fut: F) -> Self;
57}
58
59impl<F: Future + Send, T> AsyncDelay<F> for T
60where
61  T: AsyncLocalDelay<F> + Send,
62  T::Instant: Send,
63{
64  type Instant = T::Instant;
65
66  fn abort(&self) {
67    AsyncLocalDelay::abort(self);
68  }
69
70  fn cancel(&self) {
71    AsyncLocalDelay::cancel(self);
72  }
73
74  fn reset(self: Pin<&mut Self>, dur: Duration) {
75    AsyncLocalDelay::reset(self, dur);
76  }
77
78  fn reset_at(self: Pin<&mut Self>, at: Self::Instant) {
79    AsyncLocalDelay::reset_at(self, at);
80  }
81}
82
83impl<F: Future + Send, T> AsyncDelayExt<F> for T
84where
85  T: AsyncLocalDelayExt<F> + Send,
86  T::Instant: Send,
87{
88  fn delay(dur: Duration, fut: F) -> Self {
89    AsyncLocalDelayExt::delay(dur, fut)
90  }
91
92  fn delay_at(at: Self::Instant, fut: F) -> Self {
93    AsyncLocalDelayExt::delay_at(at, fut)
94  }
95}
96
97/// Like [`Delay`] but does not require `Send`
98pub trait AsyncLocalDelay<F>: Future<Output = Result<F::Output, Aborted>>
99where
100  F: Future,
101{
102  /// The instant type
103  type Instant: super::Instant;
104
105  /// Abort the delay, if future has not yet completed, then it will never be polled again.
106  fn abort(&self);
107
108  /// Cancel the delay, running the future immediately
109  fn cancel(&self);
110
111  /// Reset the delay to a new duration
112  fn reset(self: Pin<&mut Self>, dur: Duration);
113
114  /// Resets the delay to a new instant
115  fn reset_at(self: Pin<&mut Self>, at: Self::Instant);
116}
117
118/// Extension trait for [`AsyncLocalDelay`]
119pub trait AsyncLocalDelayExt<F>: AsyncLocalDelay<F>
120where
121  F: Future,
122{
123  /// Create a new delay, the future will be polled after the duration has elapsed
124  fn delay(dur: Duration, fut: F) -> Self;
125
126  /// Create a new delay, the future will be polled after the instant has elapsed
127  fn delay_at(at: Self::Instant, fut: F) -> Self;
128}
129
130pin_project_lite::pin_project! {
131  /// [`AsyncDelay`] implementation for wasm bindgen runtime
132  pub struct Delay<F, S> {
133    #[pin]
134    fut: Option<F>,
135    #[pin]
136    sleep: S,
137    aborted: AtomicBool,
138    canceled: AtomicBool,
139  }
140}
141
142impl<F: Future, S: Future> Future for Delay<F, S> {
143  type Output = Result<F::Output, Aborted>;
144
145  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146    if self.aborted.load(Ordering::Acquire) {
147      return Poll::Ready(Err(Aborted));
148    }
149
150    let this = self.project();
151    if !this.canceled.load(Ordering::Acquire) && !this.sleep.poll(cx).is_ready() {
152      return Poll::Pending;
153    }
154
155    if let Some(fut) = this.fut.as_pin_mut() {
156      return fut.poll(cx).map(Ok);
157    }
158
159    Poll::Pending
160  }
161}
162
163impl<F, S> AsyncLocalDelay<F> for Delay<F, S>
164where
165  F: Future,
166  S: AsyncLocalSleep,
167{
168  type Instant = S::Instant;
169
170  fn abort(&self) {
171    self.aborted.store(true, Ordering::Release)
172  }
173
174  fn cancel(&self) {
175    self.canceled.store(true, Ordering::Release)
176  }
177
178  fn reset(self: Pin<&mut Self>, dur: Duration) {
179    self.project().sleep.as_mut().reset(<Self::Instant as super::Instant>::now() + dur);
180  }
181
182  fn reset_at(self: Pin<&mut Self>, at: Self::Instant) {
183    self.project().sleep.as_mut().reset(at);
184  }
185}
186
187impl<F, S> AsyncLocalDelayExt<F> for Delay<F, S>
188where
189  F: Future,
190  S: AsyncLocalSleepExt,
191{
192  fn delay(dur: Duration, fut: F) -> Self {
193    Self {
194      fut: Some(fut),
195      sleep: S::sleep_local(dur),
196      aborted: AtomicBool::new(false),
197      canceled: AtomicBool::new(false),
198    }
199  }
200
201  fn delay_at(at: Self::Instant, fut: F) -> Self {
202    Self {
203      fut: Some(fut),
204      sleep: S::sleep_local_until(at),
205      aborted: AtomicBool::new(false),
206      canceled: AtomicBool::new(false),
207    }
208  }
209}
210
211#[test]
212fn test_aborted_error() {
213  assert_eq!(Aborted.to_string(), "delay aborted");
214}