Skip to main content

async_rt/
task.rs

1use crate::global::GlobalExecutor;
2use crate::{
3    AbortableJoinHandle, CommunicationTask, Executor, ExecutorBlocking, JoinHandle,
4    UnboundedCommunicationTask,
5};
6use futures::channel::mpsc::{Receiver, UnboundedReceiver};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11const EXECUTOR: GlobalExecutor = GlobalExecutor;
12
13/// Spawns a new asynchronous task in the background, returning a Future [`JoinHandle`] for it.
14pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
15where
16    F: Future + Send + 'static,
17    F::Output: Send + 'static,
18{
19    EXECUTOR.spawn(future)
20}
21
22pub fn spawn_blocking<F, T>(future: F) -> JoinHandle<T>
23where
24    F: FnOnce() -> T + Send + 'static,
25    T: Send + 'static,
26{
27    EXECUTOR.spawn_blocking(future)
28}
29
30/// Spawns a new asynchronous task in the background, returning an abortable handle that will cancel the task
31/// once the handle is dropped.
32///
33/// Note: This function is used if the task is expected to run until the handle is dropped. It is recommended to use
34/// [`spawn`] or [`dispatch`] otherwise.
35pub fn spawn_abortable<F>(future: F) -> AbortableJoinHandle<F::Output>
36where
37    F: Future + Send + 'static,
38    F::Output: Send + 'static,
39{
40    EXECUTOR.spawn_abortable(future)
41}
42
43/// Spawns a new asynchronous task in the background without a handle.
44/// Basically the same as [`spawn`].
45pub fn dispatch<F>(future: F)
46where
47    F: Future + Send + 'static,
48    F::Output: Send + 'static,
49{
50    EXECUTOR.dispatch(future);
51}
52
53/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
54/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
55/// (in other words, all handles are dropped), the task would be aborted.
56pub fn spawn_coroutine<T, F, Fut>(f: F) -> CommunicationTask<T>
57where
58    F: FnMut(Receiver<T>) -> Fut,
59    Fut: Future<Output = ()> + Send + 'static,
60{
61    EXECUTOR.spawn_coroutine(f)
62}
63
64/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc).
65/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
66/// (in other words, all handles are dropped), the task would be aborted.
67pub fn spawn_coroutine_with_buffer<T, F, Fut>(buffer: usize, f: F) -> CommunicationTask<T>
68where
69    F: FnMut(Receiver<T>) -> Fut,
70    Fut: Future<Output = ()> + Send + 'static,
71{
72    EXECUTOR.spawn_coroutine_with_buffer(buffer, f)
73}
74
75/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
76/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
77/// (in other words, all handles are dropped), the task would be aborted.
78pub fn spawn_coroutine_with_context<T, F, C, Fut>(context: C, f: F) -> CommunicationTask<T>
79where
80    F: FnMut(C, Receiver<T>) -> Fut,
81    Fut: Future<Output = ()> + Send + 'static,
82{
83    EXECUTOR.spawn_coroutine_with_context(context, f)
84}
85
86/// Spawns a new asynchronous task with a set channel buffer and provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
87/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
88/// (in other words, all handles are dropped), the task would be aborted.
89pub fn spawn_coroutine_with_buffer_and_context<T, F, C, Fut>(
90    context: C,
91    buffer: usize,
92    f: F,
93) -> CommunicationTask<T>
94where
95    F: FnMut(C, Receiver<T>) -> Fut,
96    Fut: Future<Output = ()> + Send + 'static,
97{
98    EXECUTOR.spawn_coroutine_with_buffer_and_context(context, buffer, f)
99}
100
101/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
102/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
103/// (in other words, all handles are dropped), the task would be aborted.
104pub fn spawn_unbounded_coroutine<T, F, Fut>(f: F) -> UnboundedCommunicationTask<T>
105where
106    F: FnMut(UnboundedReceiver<T>) -> Fut,
107    Fut: Future<Output = ()> + Send + 'static,
108{
109    EXECUTOR.spawn_unbounded_coroutine(f)
110}
111
112/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
113/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
114/// (in other words, all handles are dropped), the task would be aborted.
115pub fn spawn_unbounded_coroutine_with_context<T, F, C, Fut>(
116    context: C,
117    f: F,
118) -> UnboundedCommunicationTask<T>
119where
120    F: FnMut(C, UnboundedReceiver<T>) -> Fut,
121    Fut: Future<Output = ()> + Send + 'static,
122{
123    EXECUTOR.spawn_unbounded_coroutine_with_context(context, f)
124}
125
126#[derive(Default)]
127struct Yield {
128    yielded: bool,
129}
130
131impl Future for Yield {
132    type Output = ();
133
134    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
135        if self.yielded {
136            return Poll::Ready(());
137        }
138        self.yielded = true;
139        cx.waker().wake_by_ref();
140        Poll::Pending
141    }
142}
143
144/// Yields execution back to the runtime
145pub fn yield_now() -> impl Future<Output = ()> {
146    Yield::default()
147}