agnostic_lite/
spawner.rs

1use core::future::Future;
2
3cfg_time!(
4  use core::time::Duration;
5);
6
7use crate::Yielder;
8
9#[cfg(any(feature = "smol", feature = "async-std"))]
10macro_rules! join_handle {
11  ($handle:ty) => {
12    pin_project_lite::pin_project! {
13      /// An owned permission to join on a task (await its termination).
14      pub struct JoinHandle<T> {
15        #[pin]
16        handle: $handle,
17      }
18    }
19
20    impl<T> From<$handle> for JoinHandle<T> {
21      fn from(handle: $handle) -> Self {
22        Self { handle }
23      }
24    }
25
26    impl<T> core::future::Future for JoinHandle<T> {
27      type Output = core::result::Result<T, $crate::spawner::handle::JoinError>;
28
29      fn poll(
30        self: core::pin::Pin<&mut Self>,
31        cx: &mut core::task::Context<'_>,
32      ) -> core::task::Poll<Self::Output> {
33        let this = self.project();
34
35        match this.handle.poll(cx) {
36          core::task::Poll::Ready(v) => core::task::Poll::Ready(Ok(v)),
37          core::task::Poll::Pending => core::task::Poll::Pending,
38        }
39      }
40    }
41  };
42}
43
44pub(crate) mod handle {
45  /// Task failed to execute to completion.
46  ///
47  /// This error will never be returned for `smol` and `async-std` runtime,
48  /// having it here is just for compatibility with other runtimes.
49  #[derive(Debug, Clone, PartialEq, Eq)]
50  pub struct JoinError(());
51
52  impl JoinError {
53    /// Create a new `JoinError`.
54    #[inline]
55    #[cfg(feature = "wasm")]
56    pub(crate) const fn new() -> Self {
57      Self(())
58    }
59  }
60
61  impl core::fmt::Display for JoinError {
62    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
63      write!(f, "task failed to execute to completion")
64    }
65  }
66
67  impl core::error::Error for JoinError {}
68
69  #[cfg(feature = "std")]
70  impl From<JoinError> for std::io::Error {
71    fn from(_: JoinError) -> Self {
72      std::io::Error::new(std::io::ErrorKind::Other, "join error")
73    }
74  }
75}
76
77/// Joinhanlde trait
78pub trait JoinHandle<O>: Future<Output = Result<O, Self::JoinError>> + Unpin {
79  /// The error type for the join handle
80  #[cfg(feature = "std")]
81  type JoinError: core::error::Error + Into<std::io::Error> + Send + Sync + 'static;
82
83  /// The error type for the join handle
84  #[cfg(not(feature = "std"))]
85  type JoinError: core::error::Error + Send + Sync + 'static;
86
87  /// Detaches the task to let it keep running in the background.
88  fn detach(self)
89  where
90    Self: Sized,
91  {
92    drop(self)
93  }
94}
95
96/// Joinhanlde trait
97pub trait LocalJoinHandle<O>: Future<Output = Result<O, Self::JoinError>> + Unpin {
98  /// The error type for the join handle
99  #[cfg(feature = "std")]
100  type JoinError: core::error::Error + Into<std::io::Error> + 'static;
101  /// The error type for the join handle
102  #[cfg(not(feature = "std"))]
103  type JoinError: core::error::Error + 'static;
104
105  /// Detaches the task to let it keep running in the background.
106  fn detach(self)
107  where
108    Self: Sized,
109  {
110    drop(self)
111  }
112}
113
114/// A spawner trait for spawning futures.
115pub trait AsyncSpawner: Yielder + Copy + Send + Sync + 'static {
116  /// The handle returned by the spawner when a future is spawned.
117  type JoinHandle<O>: JoinHandle<O> + Send + Sync + 'static
118  where
119    O: Send + 'static;
120
121  /// Spawn a future.
122  fn spawn<F>(future: F) -> Self::JoinHandle<F::Output>
123  where
124    F::Output: Send + 'static,
125    F: Future + Send + 'static;
126
127  /// Spawn a future and detach it.
128  fn spawn_detach<F>(future: F)
129  where
130    F::Output: Send + 'static,
131    F: Future + Send + 'static,
132  {
133    core::mem::drop(Self::spawn(future));
134  }
135}
136
137/// A spawner trait for spawning futures.
138pub trait AsyncLocalSpawner: Yielder + Copy + 'static {
139  /// The handle returned by the spawner when a future is spawned.
140  type JoinHandle<O>: LocalJoinHandle<O> + 'static
141  where
142    O: 'static;
143
144  /// Spawn a future.
145  fn spawn_local<F>(future: F) -> Self::JoinHandle<F::Output>
146  where
147    F::Output: 'static,
148    F: Future + 'static;
149
150  /// Spawn a future and detach it.
151  fn spawn_local_detach<F>(future: F)
152  where
153    F::Output: 'static,
154    F: Future + 'static,
155  {
156    core::mem::drop(Self::spawn_local(future));
157  }
158}
159
160/// A spawner trait for spawning blocking.
161pub trait AsyncBlockingSpawner: Yielder + Copy + 'static {
162  /// The join handle type for blocking tasks
163  type JoinHandle<R>: JoinHandle<R> + Send + 'static
164  where
165    R: Send + 'static;
166
167  /// Spawn a blocking function onto the runtime
168  fn spawn_blocking<F, R>(f: F) -> Self::JoinHandle<R>
169  where
170    F: FnOnce() -> R + Send + 'static,
171    R: Send + 'static;
172
173  /// Spawn a blocking function onto the runtime and detach it
174  fn spawn_blocking_detach<F, R>(f: F)
175  where
176    F: FnOnce() -> R + Send + 'static,
177    R: Send + 'static,
178  {
179    Self::spawn_blocking(f).detach();
180  }
181}
182
183/// Canceled
184#[derive(Debug, Clone, Copy)]
185#[cfg(all(
186  feature = "time",
187  any(
188    feature = "async-std",
189    feature = "tokio",
190    feature = "smol",
191    feature = "wasm"
192  )
193))]
194pub(crate) struct Canceled;
195
196#[cfg(all(
197  feature = "time",
198  any(
199    feature = "async-std",
200    feature = "tokio",
201    feature = "smol",
202    feature = "wasm"
203  )
204))]
205impl core::fmt::Display for Canceled {
206  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
207    write!(f, "after canceled")
208  }
209}
210
211#[cfg(all(
212  feature = "time",
213  any(
214    feature = "async-std",
215    feature = "tokio",
216    feature = "smol",
217    feature = "wasm"
218  )
219))]
220impl core::error::Error for Canceled {}
221
222/// Error of [`AfterHandle`]'s output
223#[cfg(feature = "time")]
224#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
225#[derive(Debug)]
226pub enum AfterHandleError<E> {
227  /// The after function was canceled
228  Canceled,
229  /// Task failed to execute to completion.
230  Join(E),
231}
232
233#[cfg(feature = "time")]
234impl<E: core::fmt::Display> core::fmt::Display for AfterHandleError<E> {
235  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
236    match self {
237      Self::Canceled => write!(f, "after function was canceled"),
238      Self::Join(e) => write!(f, "{e}"),
239    }
240  }
241}
242
243#[cfg(feature = "time")]
244impl<E: core::error::Error> core::error::Error for AfterHandleError<E> {}
245
246#[cfg(all(feature = "time", feature = "std"))]
247impl<E: core::error::Error + Send + Sync + 'static> From<AfterHandleError<E>> for std::io::Error {
248  fn from(value: AfterHandleError<E>) -> Self {
249    match value {
250      AfterHandleError::Canceled => std::io::Error::new(std::io::ErrorKind::Other, "task canceled"),
251      AfterHandleError::Join(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
252    }
253  }
254}
255
256#[cfg(all(
257  feature = "time",
258  any(
259    feature = "async-std",
260    feature = "tokio",
261    feature = "smol",
262    feature = "wasm"
263  )
264))]
265pub(crate) struct AfterHandleSignals {
266  finished: core::sync::atomic::AtomicBool,
267  expired: core::sync::atomic::AtomicBool,
268}
269
270#[cfg(all(
271  feature = "time",
272  any(
273    feature = "async-std",
274    feature = "tokio",
275    feature = "smol",
276    feature = "wasm"
277  )
278))]
279impl AfterHandleSignals {
280  #[inline]
281  pub(crate) const fn new() -> Self {
282    Self {
283      finished: core::sync::atomic::AtomicBool::new(false),
284      expired: core::sync::atomic::AtomicBool::new(false),
285    }
286  }
287
288  #[inline]
289  pub(crate) fn set_finished(&self) {
290    self
291      .finished
292      .store(true, core::sync::atomic::Ordering::Release);
293  }
294
295  #[inline]
296  pub(crate) fn set_expired(&self) {
297    self
298      .expired
299      .store(true, core::sync::atomic::Ordering::Release);
300  }
301
302  #[inline]
303  pub(crate) fn is_finished(&self) -> bool {
304    self.finished.load(core::sync::atomic::Ordering::Acquire)
305  }
306
307  #[inline]
308  pub(crate) fn is_expired(&self) -> bool {
309    self.expired.load(core::sync::atomic::Ordering::Acquire)
310  }
311}
312
313/// The handle returned by the [`AsyncAfterSpawner`] when a after future is spawned.
314///
315/// Drop the handle to detach the task.
316#[cfg(feature = "time")]
317#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
318pub trait AfterHandle<F: Send + 'static>:
319  Send + Sync + Future<Output = Result<F, Self::JoinError>> + 'static
320{
321  /// The join error type for the join handle
322  #[cfg(feature = "std")]
323  type JoinError: core::error::Error + Into<std::io::Error> + Send + Sync + 'static;
324  /// The join error type for the join handle
325  #[cfg(not(feature = "std"))]
326  type JoinError: core::error::Error + 'static;
327
328  /// Cancels the task related to this handle.
329  ///
330  /// Returns the task’s output if it was completed just before it got canceled, or `None` if it didn’t complete.
331  fn cancel(self) -> impl Future<Output = Option<Result<F, Self::JoinError>>> + Send;
332
333  /// Resets the delay of the task related to this handle.
334  fn reset(&self, duration: core::time::Duration);
335
336  /// Aborts the task related to this handle.
337  fn abort(self);
338
339  /// Detaches the task to let it keep running in the background.
340  fn detach(self)
341  where
342    Self: Sized,
343  {
344    drop(self)
345  }
346
347  /// Returns `true` if the timer has expired.
348  fn is_expired(&self) -> bool;
349
350  /// Returns `true` if the task has finished.
351  fn is_finished(&self) -> bool;
352}
353
354/// A spawner trait for spawning futures. Go's `time.AfterFunc` equivalent.
355#[cfg(feature = "time")]
356#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
357pub trait AsyncAfterSpawner: Copy + Send + Sync + 'static {
358  /// The instant type for the spawner
359  type Instant: crate::time::Instant;
360
361  /// The handle returned by the spawner when a future is spawned.
362  type JoinHandle<F>: AfterHandle<F>
363  where
364    F: Send + 'static;
365
366  /// Spawn a future onto the runtime and run the given future after the given duration
367  fn spawn_after<F>(duration: Duration, future: F) -> Self::JoinHandle<F::Output>
368  where
369    F::Output: Send + 'static,
370    F: Future + Send + 'static;
371
372  /// Spawn and detach a future onto the runtime and run the given future after the given duration
373  fn spawn_after_detach<F>(duration: Duration, future: F)
374  where
375    F::Output: Send + 'static,
376    F: Future + Send + 'static,
377  {
378    core::mem::drop(Self::spawn_after(duration, future));
379  }
380
381  /// Spawn a future onto the runtime and run the given future after reach the given instant
382  fn spawn_after_at<F>(instant: Self::Instant, future: F) -> Self::JoinHandle<F::Output>
383  where
384    F::Output: Send + 'static,
385    F: Future + Send + 'static;
386
387  /// Spawn and detach a future onto the runtime and run the given future after reach the given instant
388  fn spawn_after_at_detach<F>(instant: Self::Instant, future: F)
389  where
390    F::Output: Send + 'static,
391    F: Future + Send + 'static,
392  {
393    Self::spawn_after_at(instant, future).detach()
394  }
395}