agnostic_lite/
spawner.rs

1use core::future::Future;
2
3cfg_time!(
4  use core::time::Duration;
5);
6
7use crate::Yielder;
8
9#[cfg(feature = "smol")]
10macro_rules! join_handle {
11  ($handle:ty) => {
12    pin_project_lite::pin_project! {
13      /// An owned permission to join on a task (await its termination).
14      pub struct JoinHandle<T> {
15        #[pin]
16        handle: $handle,
17      }
18    }
19
20    impl<T> From<$handle> for JoinHandle<T> {
21      fn from(handle: $handle) -> Self {
22        Self { handle }
23      }
24    }
25
26    impl<T> core::future::Future for JoinHandle<T> {
27      type Output = core::result::Result<T, $crate::spawner::handle::JoinError>;
28
29      fn poll(
30        self: core::pin::Pin<&mut Self>,
31        cx: &mut core::task::Context<'_>,
32      ) -> core::task::Poll<Self::Output> {
33        let this = self.project();
34
35        match this.handle.poll(cx) {
36          core::task::Poll::Ready(v) => core::task::Poll::Ready(Ok(v)),
37          core::task::Poll::Pending => core::task::Poll::Pending,
38        }
39      }
40    }
41  };
42}
43
44#[cfg(any(feature = "smol", feature = "wasm"))]
45pub(crate) mod handle {
46  /// Task failed to execute to completion.
47  ///
48  /// This error will never be returned for `smol` runtime,
49  /// having it here is just for compatibility with other runtimes.
50  #[derive(Debug, Clone, PartialEq, Eq)]
51  pub struct JoinError(());
52
53  impl JoinError {
54    /// Create a new `JoinError`.
55    #[inline]
56    #[cfg(feature = "wasm")]
57    pub(crate) const fn new() -> Self {
58      Self(())
59    }
60  }
61
62  impl core::fmt::Display for JoinError {
63    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
64      write!(f, "task failed to execute to completion")
65    }
66  }
67
68  impl core::error::Error for JoinError {}
69
70  #[cfg(feature = "std")]
71  impl From<JoinError> for std::io::Error {
72    fn from(_: JoinError) -> Self {
73      std::io::Error::other("join error")
74    }
75  }
76}
77
78/// Joinhanlde trait
79pub trait JoinHandle<O>: Future<Output = Result<O, Self::JoinError>> + Unpin {
80  /// The error type for the join handle
81  #[cfg(feature = "std")]
82  type JoinError: core::error::Error + Into<std::io::Error> + Send + Sync + 'static;
83
84  /// The error type for the join handle
85  #[cfg(not(feature = "std"))]
86  type JoinError: core::error::Error + Send + Sync + 'static;
87
88  /// Aborts the task related to this handle.
89  fn abort(self);
90
91  /// Detaches the task to let it keep running in the background.
92  fn detach(self)
93  where
94    Self: Sized,
95  {
96    drop(self)
97  }
98}
99
100/// Joinhanlde trait
101pub trait LocalJoinHandle<O>: Future<Output = Result<O, Self::JoinError>> + Unpin {
102  /// The error type for the join handle
103  #[cfg(feature = "std")]
104  type JoinError: core::error::Error + Into<std::io::Error> + 'static;
105  /// The error type for the join handle
106  #[cfg(not(feature = "std"))]
107  type JoinError: core::error::Error + 'static;
108
109  /// Detaches the task to let it keep running in the background.
110  fn detach(self)
111  where
112    Self: Sized,
113  {
114    drop(self)
115  }
116}
117
118/// A spawner trait for spawning futures.
119pub trait AsyncSpawner: Yielder + Copy + Send + Sync + 'static {
120  /// The handle returned by the spawner when a future is spawned.
121  type JoinHandle<O>: JoinHandle<O> + Send + Sync + 'static
122  where
123    O: Send + 'static;
124
125  /// Spawn a future.
126  fn spawn<F>(future: F) -> Self::JoinHandle<F::Output>
127  where
128    F::Output: Send + 'static,
129    F: Future + Send + 'static;
130
131  /// Spawn a future and detach it.
132  fn spawn_detach<F>(future: F)
133  where
134    F::Output: Send + 'static,
135    F: Future + Send + 'static,
136  {
137    core::mem::drop(Self::spawn(future));
138  }
139}
140
141/// A spawner trait for spawning futures.
142pub trait AsyncLocalSpawner: Yielder + Copy + 'static {
143  /// The handle returned by the spawner when a future is spawned.
144  type JoinHandle<O>: LocalJoinHandle<O> + 'static
145  where
146    O: 'static;
147
148  /// Spawn a future.
149  fn spawn_local<F>(future: F) -> Self::JoinHandle<F::Output>
150  where
151    F::Output: 'static,
152    F: Future + 'static;
153
154  /// Spawn a future and detach it.
155  fn spawn_local_detach<F>(future: F)
156  where
157    F::Output: 'static,
158    F: Future + 'static,
159  {
160    core::mem::drop(Self::spawn_local(future));
161  }
162}
163
164/// A spawner trait for spawning blocking.
165pub trait AsyncBlockingSpawner: Yielder + Copy + 'static {
166  /// The join handle type for blocking tasks
167  type JoinHandle<R>: JoinHandle<R> + Send + 'static
168  where
169    R: Send + 'static;
170
171  /// Spawn a blocking function onto the runtime
172  fn spawn_blocking<F, R>(f: F) -> Self::JoinHandle<R>
173  where
174    F: FnOnce() -> R + Send + 'static,
175    R: Send + 'static;
176
177  /// Spawn a blocking function onto the runtime and detach it
178  fn spawn_blocking_detach<F, R>(f: F)
179  where
180    F: FnOnce() -> R + Send + 'static,
181    R: Send + 'static,
182  {
183    Self::spawn_blocking(f).detach();
184  }
185}
186
187/// Canceled
188#[derive(Debug, Clone, Copy)]
189#[cfg(all(
190  feature = "time",
191  any(feature = "tokio", feature = "smol", feature = "wasm")
192))]
193pub(crate) struct Canceled;
194
195#[cfg(all(
196  feature = "time",
197  any(feature = "tokio", feature = "smol", feature = "wasm")
198))]
199impl core::fmt::Display for Canceled {
200  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
201    write!(f, "after canceled")
202  }
203}
204
205#[cfg(all(
206  feature = "time",
207  any(feature = "tokio", feature = "smol", feature = "wasm")
208))]
209impl core::error::Error for Canceled {}
210
211/// Error of [`AfterHandle`]'s output
212#[cfg(feature = "time")]
213#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
214#[derive(Debug)]
215pub enum AfterHandleError<E> {
216  /// The after function was canceled
217  Canceled,
218  /// Task failed to execute to completion.
219  Join(E),
220}
221
222#[cfg(feature = "time")]
223impl<E: core::fmt::Display> core::fmt::Display for AfterHandleError<E> {
224  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
225    match self {
226      Self::Canceled => write!(f, "after function was canceled"),
227      Self::Join(e) => write!(f, "{e}"),
228    }
229  }
230}
231
232#[cfg(feature = "time")]
233impl<E: core::error::Error> core::error::Error for AfterHandleError<E> {}
234
235#[cfg(all(feature = "time", feature = "std"))]
236impl<E: core::error::Error + Send + Sync + 'static> From<AfterHandleError<E>> for std::io::Error {
237  fn from(value: AfterHandleError<E>) -> Self {
238    match value {
239      AfterHandleError::Canceled => std::io::Error::other("task canceled"),
240      AfterHandleError::Join(e) => std::io::Error::other(e),
241    }
242  }
243}
244
245#[cfg(all(
246  feature = "time",
247  any(feature = "tokio", feature = "smol", feature = "wasm")
248))]
249pub(crate) struct AfterHandleSignals {
250  finished: core::sync::atomic::AtomicBool,
251  expired: core::sync::atomic::AtomicBool,
252}
253
254#[cfg(all(
255  feature = "time",
256  any(feature = "tokio", feature = "smol", feature = "wasm")
257))]
258impl AfterHandleSignals {
259  #[inline]
260  pub(crate) const fn new() -> Self {
261    Self {
262      finished: core::sync::atomic::AtomicBool::new(false),
263      expired: core::sync::atomic::AtomicBool::new(false),
264    }
265  }
266
267  #[inline]
268  pub(crate) fn set_finished(&self) {
269    self
270      .finished
271      .store(true, core::sync::atomic::Ordering::Release);
272  }
273
274  #[inline]
275  pub(crate) fn set_expired(&self) {
276    self
277      .expired
278      .store(true, core::sync::atomic::Ordering::Release);
279  }
280
281  #[inline]
282  pub(crate) fn is_finished(&self) -> bool {
283    self.finished.load(core::sync::atomic::Ordering::Acquire)
284  }
285
286  #[inline]
287  pub(crate) fn is_expired(&self) -> bool {
288    self.expired.load(core::sync::atomic::Ordering::Acquire)
289  }
290}
291
292/// The handle returned by the [`AsyncAfterSpawner`] when a after future is spawned.
293///
294/// Drop the handle to detach the task.
295#[cfg(feature = "time")]
296#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
297pub trait AfterHandle<F: Send + 'static>:
298  Send + Sync + Future<Output = Result<F, Self::JoinError>> + 'static
299{
300  /// The join error type for the join handle
301  #[cfg(feature = "std")]
302  type JoinError: core::error::Error + Into<std::io::Error> + Send + Sync + 'static;
303  /// The join error type for the join handle
304  #[cfg(not(feature = "std"))]
305  type JoinError: core::error::Error + 'static;
306
307  /// Cancels the task related to this handle.
308  ///
309  /// Returns the task’s output if it was completed just before it got canceled, or `None` if it didn’t complete.
310  fn cancel(self) -> impl Future<Output = Option<Result<F, Self::JoinError>>> + Send;
311
312  /// Resets the delay of the task related to this handle.
313  fn reset(&self, duration: core::time::Duration);
314
315  /// Aborts the task related to this handle.
316  fn abort(self);
317
318  /// Detaches the task to let it keep running in the background.
319  fn detach(self)
320  where
321    Self: Sized,
322  {
323    drop(self)
324  }
325
326  /// Returns `true` if the timer has expired.
327  fn is_expired(&self) -> bool;
328
329  /// Returns `true` if the task has finished.
330  fn is_finished(&self) -> bool;
331}
332
333/// A spawner trait for spawning futures. Go's `time.AfterFunc` equivalent.
334#[cfg(feature = "time")]
335#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
336pub trait AsyncAfterSpawner: Copy + Send + Sync + 'static {
337  /// The instant type for the spawner
338  type Instant: crate::time::Instant;
339
340  /// The handle returned by the spawner when a future is spawned.
341  type JoinHandle<F>: AfterHandle<F>
342  where
343    F: Send + 'static;
344
345  /// Spawn a future onto the runtime and run the given future after the given duration
346  fn spawn_after<F>(duration: Duration, future: F) -> Self::JoinHandle<F::Output>
347  where
348    F::Output: Send + 'static,
349    F: Future + Send + 'static;
350
351  /// Spawn and detach a future onto the runtime and run the given future after the given duration
352  fn spawn_after_detach<F>(duration: Duration, future: F)
353  where
354    F::Output: Send + 'static,
355    F: Future + Send + 'static,
356  {
357    core::mem::drop(Self::spawn_after(duration, future));
358  }
359
360  /// Spawn a future onto the runtime and run the given future after reach the given instant
361  fn spawn_after_at<F>(instant: Self::Instant, future: F) -> Self::JoinHandle<F::Output>
362  where
363    F::Output: Send + 'static,
364    F: Future + Send + 'static;
365
366  /// Spawn and detach a future onto the runtime and run the given future after reach the given instant
367  fn spawn_after_at_detach<F>(instant: Self::Instant, future: F)
368  where
369    F::Output: Send + 'static,
370    F: Future + Send + 'static,
371  {
372    Self::spawn_after_at(instant, future).detach()
373  }
374}