Skip to main content

async_rt/
task.rs

1use crate::global::GlobalExecutor;
2use crate::{AbortableJoinHandle, CommunicationTask, Executor, ExecutorBlocking, JoinHandle, UnboundedCommunicationTask};
3use futures::channel::mpsc::{Receiver, UnboundedReceiver};
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8const EXECUTOR: GlobalExecutor = GlobalExecutor;
9
10/// Spawns a new asynchronous task in the background, returning a Future [`JoinHandle`] for it.
11pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
12where
13    F: Future + Send + 'static,
14    F::Output: Send + 'static,
15{
16    EXECUTOR.spawn(future)
17}
18
19pub fn spawn_blocking<F, T>(future: F) -> JoinHandle<T>
20where
21    F: FnOnce() -> T + Send + 'static,
22    T: Send + 'static,
23{
24    EXECUTOR.spawn_blocking(future)
25}
26
27/// Spawns a new asynchronous task in the background, returning an abortable handle that will cancel the task
28/// once the handle is dropped.
29///
30/// Note: This function is used if the task is expected to run until the handle is dropped. It is recommended to use
31/// [`spawn`] or [`dispatch`] otherwise.
32pub fn spawn_abortable<F>(future: F) -> AbortableJoinHandle<F::Output>
33where
34    F: Future + Send + 'static,
35    F::Output: Send + 'static,
36{
37    EXECUTOR.spawn_abortable(future)
38}
39
40/// Spawns a new asynchronous task in the background without a handle.
41/// Basically the same as [`spawn`].
42pub fn dispatch<F>(future: F)
43where
44    F: Future + Send + 'static,
45    F::Output: Send + 'static,
46{
47    EXECUTOR.dispatch(future);
48}
49
50/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
51/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
52/// (in other words, all handles are dropped), the task would be aborted.
53pub fn spawn_coroutine<T, F, Fut>(f: F) -> CommunicationTask<T>
54where
55    F: FnMut(Receiver<T>) -> Fut,
56    Fut: Future<Output = ()> + Send + 'static,
57{
58    EXECUTOR.spawn_coroutine(f)
59}
60
61/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc).
62/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
63/// (in other words, all handles are dropped), the task would be aborted.
64pub fn spawn_coroutine_with_buffer<T, F, Fut>(buffer: usize, f: F) -> CommunicationTask<T>
65where
66    F: FnMut(Receiver<T>) -> Fut,
67    Fut: Future<Output = ()> + Send + 'static,
68{
69    EXECUTOR.spawn_coroutine_with_buffer(buffer, f)
70}
71
72/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
73/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
74/// (in other words, all handles are dropped), the task would be aborted.
75pub fn spawn_coroutine_with_context<T, F, C, Fut>(context: C, f: F) -> CommunicationTask<T>
76where
77    F: FnMut(C, Receiver<T>) -> Fut,
78    Fut: Future<Output = ()> + Send + 'static,
79{
80    EXECUTOR.spawn_coroutine_with_context(context, f)
81}
82
83/// Spawns a new asynchronous task with a set channel buffer and provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
84/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
85/// (in other words, all handles are dropped), the task would be aborted.
86pub fn spawn_coroutine_with_buffer_and_context<T, F, C, Fut>(context: C, buffer: usize, f: F) -> CommunicationTask<T>
87where
88    F: FnMut(C, Receiver<T>) -> Fut,
89    Fut: Future<Output = ()> + Send + 'static,
90{
91    EXECUTOR.spawn_coroutine_with_buffer_and_context(context, buffer, f)
92}
93
94/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
95/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
96/// (in other words, all handles are dropped), the task would be aborted.
97pub fn spawn_unbounded_coroutine<T, F, Fut>(f: F) -> UnboundedCommunicationTask<T>
98where
99    F: FnMut(UnboundedReceiver<T>) -> Fut,
100    Fut: Future<Output = ()> + Send + 'static,
101{
102    EXECUTOR.spawn_unbounded_coroutine(f)
103}
104
105/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
106/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
107/// (in other words, all handles are dropped), the task would be aborted.
108pub fn spawn_unbounded_coroutine_with_context<T, F, C, Fut>(
109    context: C,
110    f: F,
111) -> UnboundedCommunicationTask<T>
112where
113    F: FnMut(C, UnboundedReceiver<T>) -> Fut,
114    Fut: Future<Output = ()> + Send + 'static,
115{
116    EXECUTOR.spawn_unbounded_coroutine_with_context(context, f)
117}
118
119#[derive(Default)]
120struct Yield {
121    yielded: bool,
122}
123
124impl Future for Yield {
125    type Output = ();
126
127    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
128        if self.yielded {
129            return Poll::Ready(());
130        }
131        self.yielded = true;
132        cx.waker().wake_by_ref();
133        Poll::Pending
134    }
135}
136
137/// Yields execution back to the runtime
138pub fn yield_now() -> impl Future<Output = ()> {
139    Yield::default()
140}