tokio_with_wasm/glue/task/mod.rs
1//! Asynchronous green-threads.
2//!
3//! Resembling the familiar `tokio::task` patterns.
4//! this module leverages web workers to execute tasks in parallel,
5//! making it ideal for high-performance web applications.
6
7mod join_set;
8mod pool;
9
10pub use join_set::*;
11use wasm_bindgen::prelude::JsValue;
12
13use crate::{
14 LogError, OnceReceiver, OnceSender, SelectFuture, is_main_thread,
15 once_channel, set_timeout,
16};
17use js_sys::Promise;
18use pool::WorkerPool;
19use std::error::Error;
20use std::fmt::{Debug, Display, Formatter};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use wasm_bindgen_futures::{JsFuture, spawn_local};
25
26thread_local! {
27 static WORKER_POOL: WorkerPool = {
28 let worker_pool = WorkerPool::new();
29 spawn_local(manage_pool());
30 worker_pool
31 }
32}
33
34/// Manages the worker pool by periodically checking for
35/// inactive web workers and queued tasks.
36async fn manage_pool() {
37 loop {
38 WORKER_POOL.with(|worker_pool| {
39 worker_pool.remove_inactive_workers();
40 worker_pool.flush_queued_tasks();
41 });
42 let promise = Promise::new(&mut |resolve, _reject| {
43 set_timeout(&resolve, 100.0);
44 });
45 JsFuture::from(promise).await.log_error("MANAGE_POOL");
46 }
47}
48
49/// Spawns a new asynchronous task, returning a
50/// [`JoinHandle`] for it.
51///
52/// The provided future will start running in the JavaScript event loop
53/// when `spawn` is called, even if you don't await the returned
54/// `JoinHandle`.
55///
56/// Spawning a task enables the task to execute concurrently to other tasks. The
57/// spawned task will always execute on the current web worker(thread),
58/// as that's how JavaScript's `Promise` basically works.
59///
60/// # Examples
61///
62/// In this example, a server is started and `spawn` is used to start a new task
63/// that processes each received connection.
64///
65/// ```no_run
66/// use std::io;
67/// use tokio_with_wasm as tokio;
68///
69/// async fn process() -> io::Result<()> {
70/// // Some process...
71/// }
72///
73/// async fn work() -> io::Result<()> {
74/// let result = tokio::spawn(async move {
75/// // Process this job concurrently.
76/// process(socket).await
77/// }).await?;;
78/// }
79/// ```
80///
81/// To run multiple tasks in parallel and receive their results, join
82/// handles can be stored in a vector.
83/// ```
84/// use tokio_with_wasm as tokio;
85///
86/// async fn my_background_op(id: i32) -> String {
87/// let s = format!("Starting background task {}.", id);
88/// println!("{}", s);
89/// s
90///
91/// let ops = vec![1, 2, 3];
92/// let mut tasks = Vec::with_capacity(ops.len());
93/// for op in ops {
94/// // This call will make them start running in the background
95/// // immediately.
96/// tasks.push(tokio::spawn(my_background_op(op)));
97/// }
98///
99/// let mut outputs = Vec::with_capacity(tasks.len());
100/// for task in tasks {
101/// match task.await {
102/// Ok(output) => outputs.push(output),
103/// Err(err) => {
104/// println!("An error occurred: {}", err);
105/// }
106/// }
107/// }
108/// println!("{:?}", outputs);
109/// # }
110/// ```
111/// This example pushes the tasks to `outputs` in the order they were
112/// started in.
113///
114/// # Using `!Send` values from a task
115///
116/// The task supplied to `spawn` is not required to implement `Send`.
117/// This is different from multi-threaded native async runtimes,
118/// because JavaScript environment is inherently single-threaded.
119///
120/// For example, this will work:
121///
122/// ```
123/// use std::rc::Rc;
124/// use tokio_with_wasm as tokio;
125///
126/// fn use_rc(rc: Rc<()>) {
127/// // Do stuff w/ rc
128/// # drop(rc);
129/// }
130///
131/// async fn work() {
132/// tokio::spawn(async {
133/// // Force the `Rc` to stay in a scope with no `.await`
134/// {
135/// let rc = Rc::new(());
136/// use_rc(rc.clone());
137/// }
138///
139/// tokio::task::yield_now().await;
140/// }).await;
141/// }
142/// ```
143///
144/// This will work too, unlike multi-threaded native runtimes
145/// where `!Send` values cannot live across `.await`:
146///
147/// ```
148/// use std::rc::Rc;
149/// use tokio_with_wasm as tokio;
150///
151/// fn use_rc(rc: Rc<()>) {
152/// // Do stuff w/ rc
153/// # drop(rc);
154/// }
155///
156/// async fn work() {
157/// tokio::spawn(async {
158/// let rc = Rc::new(());
159///
160/// tokio::task::yield_now().await;
161///
162/// use_rc(rc.clone());
163/// }).await;
164/// }
165/// ```
166pub fn spawn<F, T>(future: F) -> JoinHandle<T>
167where
168 F: std::future::Future<Output = T> + 'static,
169 T: 'static,
170{
171 if !is_main_thread() {
172 JsValue::from_str(concat!(
173 "Calling `spawn` in a blocking thread is not allowed. ",
174 "While this is possible in real `tokio`, ",
175 "it may cause undefined behavior in the JavaScript environment. ",
176 "Instead, use `tokio::sync::mpsc::channel` ",
177 "to listen for messages from the main thread ",
178 "and spawn a task there."
179 ))
180 .log_error("SPAWN");
181 panic!();
182 }
183 let (join_sender, join_receiver) = once_channel();
184 let (cancel_sender, cancel_receiver) = once_channel::<()>();
185 spawn_local(async move {
186 let result = SelectFuture::new(
187 async move {
188 let output = future.await;
189 Ok(output)
190 },
191 async move {
192 cancel_receiver.await;
193 Err(JoinError { cancelled: true })
194 },
195 )
196 .await;
197 join_sender.send(result);
198 });
199 JoinHandle {
200 join_receiver,
201 cancel_sender,
202 }
203}
204
205/// Runs the provided closure on a web worker(thread) where blocking is acceptable.
206///
207/// In general, issuing a blocking call or performing a lot of compute in a
208/// future without yielding is problematic, as it may prevent the JavaScript runtime from
209/// driving other futures forward. This function runs the provided closure on a
210/// web worker dedicated to blocking operations.
211///
212/// More and more web workers will be spawned when they are requested through this
213/// function until the upper limit of 512 is reached.
214/// After reaching the upper limit, the tasks will wait for
215/// any of the web workers to become idle.
216/// When a web worker remains idle for 10 seconds, it will be terminated
217/// and get removed from the worker pool, which is a similiar behavior to that of `tokio`.
218/// The web worker limit is very large by default, because `spawn_blocking` is often
219/// used for various kinds of IO operations that cannot be performed
220/// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
221/// should keep this large upper limit in mind.
222///
223/// # Examples
224///
225/// Pass an input value and receive result of computation:
226///
227/// ```
228/// use tokio_with_wasm as tokio;
229///
230/// // Initial input
231/// let mut data = "Hello, ".to_string();
232/// let output = tokio::task::spawn_blocking(move || {
233/// // Stand-in for compute-heavy work or using synchronous APIs
234/// data.push_str("world");
235/// // Pass ownership of the value back to the asynchronous context
236/// data
237/// }).await?;
238///
239/// // `output` is the value returned from the thread
240/// assert_eq!(output.as_str(), "Hello, world");
241/// Ok(())
242/// ```
243pub fn spawn_blocking<C, T>(callable: C) -> JoinHandle<T>
244where
245 C: FnOnce() -> T + Send + 'static,
246 T: Send + 'static,
247{
248 if !is_main_thread() {
249 JsValue::from_str(concat!(
250 "Calling `spawn_blocking` in a blocking thread is not allowed. ",
251 "While this is possible in real `tokio`, ",
252 "it may cause undefined behavior in the JavaScript environment. ",
253 "Instead, use `tokio::sync::mpsc::channel` ",
254 "to listen for messages from the main thread ",
255 "and spawn a task there."
256 ))
257 .log_error("SPAWN_BLOCKING");
258 panic!();
259 }
260 let (join_sender, join_receiver) = once_channel();
261 let (cancel_sender, cancel_receiver) = once_channel::<()>();
262 WORKER_POOL.with(move |worker_pool| {
263 worker_pool.queue_task(move || {
264 if cancel_receiver.is_done() {
265 join_sender.send(Err(JoinError { cancelled: true }));
266 return;
267 }
268 let returned = callable();
269 join_sender.send(Ok(returned));
270 })
271 });
272 JoinHandle {
273 join_receiver,
274 cancel_sender,
275 }
276}
277
278/// Yields execution back to the JavaScript event loop.
279///
280/// To avoid blocking inside a long-running function,
281/// you have to yield to the async event loop regularly.
282///
283/// The async task may resume when it has its turn back.
284/// Meanwhile, any other pending tasks will be scheduled
285/// by the JavaScript runtime.
286pub async fn yield_now() {
287 let promise = Promise::new(&mut |resolve, _reject| {
288 set_timeout(&resolve, 0.0);
289 });
290 JsFuture::from(promise).await.log_error("YIELD_NOW");
291}
292
293/// An owned permission to join on a task (awaiting its termination).
294///
295/// This can be thought of as the equivalent of
296/// [`std::thread::JoinHandle`] or `tokio::task::JoinHandle` for
297/// a task that is executed concurrently.
298///
299/// A `JoinHandle` *detaches* the associated task when it is dropped, which
300/// means that there is no longer any handle to the task, and no way to `join`
301/// on it.
302///
303/// This struct is created by the [`spawn`] and [`spawn_blocking`]
304/// functions.
305///
306/// # Examples
307///
308/// Creation from [`spawn`]:
309///
310/// ```
311/// use tokio_with_wasm as tokio;
312/// use tokio::spawn;
313///
314/// let join_handle: tokio::task::JoinHandle<_> = spawn(async {
315/// // some work here
316/// });
317/// ```
318///
319/// Creation from [`spawn_blocking`]:
320///
321/// ```
322/// use tokio_with_wasm as tokio;
323/// use tokio::task::spawn_blocking;
324///
325/// let join_handle: tokio::task::JoinHandle<_> = spawn_blocking(|| {
326/// // some blocking work here
327/// });
328/// ```
329///
330/// Child being detached and outliving its parent:
331///
332/// ```no_run
333/// use tokio_with_wasm as tokio;
334/// use tokio::spawn;
335///
336/// let original_task = spawn(async {
337/// let _detached_task = spawn(async {
338/// // Here we sleep to make sure that the first task returns before.
339/// // Assume that code takes a few seconds to execute here.
340/// // This will be called, even though the JoinHandle is dropped.
341/// println!("♫ Still alive ♫");
342/// });
343/// });
344///
345/// original_task.await;
346/// println!("Original task is joined.");
347/// ```
348pub struct JoinHandle<T> {
349 join_receiver: OnceReceiver<Result<T, JoinError>>,
350 cancel_sender: OnceSender<()>,
351}
352
353impl<T> Future for JoinHandle<T> {
354 type Output = Result<T, JoinError>;
355 fn poll(
356 mut self: Pin<&mut Self>,
357 cx: &mut Context<'_>,
358 ) -> Poll<Self::Output> {
359 let pinned_receiver = Pin::new(&mut self.join_receiver);
360 pinned_receiver.poll(cx)
361 }
362}
363
364impl<T> Debug for JoinHandle<T>
365where
366 T: Debug,
367{
368 fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
369 fmt.debug_struct("JoinHandle").finish()
370 }
371}
372
373impl<T> JoinHandle<T> {
374 /// Abort the task associated with the handle.
375 ///
376 /// Awaiting a cancelled task might complete as usual if the task was
377 /// already completed at the time it was cancelled, but most likely it
378 /// will fail with a cancelled `JoinError`.
379 ///
380 /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
381 /// because they are not async. If you call `abort` on a `spawn_blocking`
382 /// task, then this *will not have any effect*, and the task will continue
383 /// running normally. The exception is if the task has not started running
384 /// yet; in that case, calling `abort` may prevent the task from starting.
385 ///
386 /// ```rust
387 /// use tokio_with_wasm as tokio;
388 /// use tokio::time;
389 ///
390 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
391 /// # async fn main() {
392 /// let mut handles = Vec::new();
393 ///
394 /// handles.push(tokio::spawn(async {
395 /// time::sleep(time::Duration::from_secs(10)).await;
396 /// true
397 /// }));
398 ///
399 /// handles.push(tokio::spawn(async {
400 /// time::sleep(time::Duration::from_secs(10)).await;
401 /// false
402 /// }));
403 ///
404 /// for handle in &handles {
405 /// handle.abort();
406 /// }
407 ///
408 /// for handle in handles {
409 /// assert!(handle.await.unwrap_err().is_cancelled());
410 /// }
411 /// # }
412 /// ```
413 pub fn abort(&self) {
414 self.cancel_sender.send(());
415 }
416
417 /// Checks if the task associated with this `JoinHandle` has finished.
418 ///
419 /// Please note that this method can return `false` even if [`abort`] has been
420 /// called on the task. This is because the cancellation process may take
421 /// some time, and this method does not return `true` until it has
422 /// completed.
423 pub fn is_finished(&self) -> bool {
424 self.join_receiver.is_done()
425 }
426
427 /// Returns a new `AbortHandle` that can be used to remotely abort this task.
428 pub fn abort_handle(&self) -> AbortHandle {
429 AbortHandle {
430 cancel_sender: self.cancel_sender.clone(),
431 }
432 }
433}
434
435/// Returned when a task failed to execute to completion.
436#[derive(Debug)]
437pub struct JoinError {
438 cancelled: bool,
439}
440
441impl Display for JoinError {
442 fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
443 fmt.write_str("task failed to execute to completion")
444 }
445}
446
447impl Error for JoinError {}
448
449impl JoinError {
450 pub fn is_cancelled(&self) -> bool {
451 self.cancelled
452 }
453}
454
455/// An owned permission to abort a spawned task, without awaiting its completion.
456///
457/// Unlike a [`JoinHandle`], an `AbortHandle` does *not* represent the
458/// permission to await the task's completion, only to terminate it.
459///
460/// The task may be aborted by calling the [`AbortHandle::abort`] method.
461/// Dropping an `AbortHandle` releases the permission to terminate the task
462/// --- it does *not* abort the task.
463///
464/// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
465/// because they are not async. If you call `abort` on a `spawn_blocking` task,
466/// then this *will not have any effect*, and the task will continue running
467/// normally. The exception is if the task has not started running yet; in that
468/// case, calling `abort` may prevent the task from starting.
469///
470/// [`JoinHandle`]: crate::task::JoinHandle
471/// [`spawn_blocking`]: crate::task::spawn_blocking
472#[derive(Clone)]
473pub struct AbortHandle {
474 cancel_sender: OnceSender<()>,
475}
476
477impl AbortHandle {
478 /// Abort the task associated with the handle.
479 ///
480 /// Awaiting a cancelled task might complete as usual if the task was
481 /// already completed at the time it was cancelled, but most likely it
482 /// will fail with a [cancelled] `JoinError`.
483 ///
484 /// If the task was already cancelled, such as by [`JoinHandle::abort`],
485 /// this method will do nothing.
486 ///
487 /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
488 /// because they are not async. If you call `abort` on a `spawn_blocking`
489 /// task, then this *will not have any effect*, and the task will continue
490 /// running normally. The exception is if the task has not started running
491 /// yet; in that case, calling `abort` may prevent the task from starting.
492 pub fn abort(&self) {
493 self.cancel_sender.send(());
494 }
495}
496
497impl Debug for AbortHandle {
498 fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
499 fmt.debug_struct("AbortHandle").finish()
500 }
501}