tokio_with_wasm/glue/task/
mod.rs

1//! Asynchronous green-threads.
2//!
3//! Resembling the familiar `tokio::task` patterns.
4//! this module leverages web workers to execute tasks in parallel,
5//! making it ideal for high-performance web applications.
6
7mod join_set;
8mod pool;
9
10pub use join_set::*;
11use wasm_bindgen::prelude::JsValue;
12
13use crate::{
14  LogError, OnceReceiver, OnceSender, SelectFuture, is_main_thread,
15  once_channel, set_timeout,
16};
17use js_sys::Promise;
18use pool::WorkerPool;
19use std::error::Error;
20use std::fmt::{Debug, Display, Formatter};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use wasm_bindgen_futures::{JsFuture, spawn_local};
25
26thread_local! {
27    static WORKER_POOL: WorkerPool = {
28        let worker_pool = WorkerPool::new();
29        spawn_local(manage_pool());
30        worker_pool
31    }
32}
33
34/// Manages the worker pool by periodically checking for
35/// inactive web workers and queued tasks.
36async fn manage_pool() {
37  loop {
38    WORKER_POOL.with(|worker_pool| {
39      worker_pool.remove_inactive_workers();
40      worker_pool.flush_queued_tasks();
41    });
42    let promise = Promise::new(&mut |resolve, _reject| {
43      set_timeout(&resolve, 100.0);
44    });
45    JsFuture::from(promise).await.log_error("MANAGE_POOL");
46  }
47}
48
49/// Spawns a new asynchronous task, returning a
50/// [`JoinHandle`] for it.
51///
52/// The provided future will start running in the JavaScript event loop
53/// when `spawn` is called, even if you don't await the returned
54/// `JoinHandle`.
55///
56/// Spawning a task enables the task to execute concurrently to other tasks. The
57/// spawned task will always execute on the current web worker(thread),
58/// as that's how JavaScript's `Promise` basically works.
59///
60/// # Examples
61///
62/// In this example, a server is started and `spawn` is used to start a new task
63/// that processes each received connection.
64///
65/// ```no_run
66/// use std::io;
67/// use tokio_with_wasm as tokio;
68///
69/// async fn process() -> io::Result<()> {
70///     // Some process...
71/// }
72///
73/// async fn work() -> io::Result<()> {
74///     let result = tokio::spawn(async move {
75///         // Process this job concurrently.
76///         process(socket).await
77///     }).await?;;
78/// }
79/// ```
80///
81/// To run multiple tasks in parallel and receive their results, join
82/// handles can be stored in a vector.
83/// ```
84/// use tokio_with_wasm as tokio;
85///
86/// async fn my_background_op(id: i32) -> String {
87///     let s = format!("Starting background task {}.", id);
88///     println!("{}", s);
89///     s
90///
91/// let ops = vec![1, 2, 3];
92/// let mut tasks = Vec::with_capacity(ops.len());
93/// for op in ops {
94///     // This call will make them start running in the background
95///     // immediately.
96///     tasks.push(tokio::spawn(my_background_op(op)));
97/// }
98///
99/// let mut outputs = Vec::with_capacity(tasks.len());
100/// for task in tasks {
101///     match task.await {
102///         Ok(output) => outputs.push(output),
103///         Err(err) => {
104///             println!("An error occurred: {}", err);
105///         }
106///     }
107/// }
108/// println!("{:?}", outputs);
109/// # }
110/// ```
111/// This example pushes the tasks to `outputs` in the order they were
112/// started in.
113///
114/// # Using `!Send` values from a task
115///
116/// The task supplied to `spawn` is not required to implement `Send`.
117/// This is different from multi-threaded native async runtimes,
118/// because JavaScript environment is inherently single-threaded.
119///
120/// For example, this will work:
121///
122/// ```
123/// use std::rc::Rc;
124/// use tokio_with_wasm as tokio;
125///
126/// fn use_rc(rc: Rc<()>) {
127///     // Do stuff w/ rc
128/// # drop(rc);
129/// }
130///
131/// async fn work() {
132///     tokio::spawn(async {
133///         // Force the `Rc` to stay in a scope with no `.await`
134///         {
135///             let rc = Rc::new(());
136///             use_rc(rc.clone());
137///         }
138///
139///         tokio::task::yield_now().await;
140///     }).await;
141/// }
142/// ```
143///
144/// This will work too, unlike multi-threaded native runtimes
145/// where `!Send` values cannot live across `.await`:
146///
147/// ```
148/// use std::rc::Rc;
149/// use tokio_with_wasm as tokio;
150///
151/// fn use_rc(rc: Rc<()>) {
152///     // Do stuff w/ rc
153/// # drop(rc);
154/// }
155///
156/// async fn work() {
157///     tokio::spawn(async {
158///         let rc = Rc::new(());
159///
160///         tokio::task::yield_now().await;
161///
162///         use_rc(rc.clone());
163///     }).await;
164/// }
165/// ```
166pub fn spawn<F, T>(future: F) -> JoinHandle<T>
167where
168  F: std::future::Future<Output = T> + 'static,
169  T: 'static,
170{
171  if !is_main_thread() {
172    JsValue::from_str(concat!(
173      "Calling `spawn` in a blocking thread is not allowed. ",
174      "While this is possible in real `tokio`, ",
175      "it may cause undefined behavior in the JavaScript environment. ",
176      "Instead, use `tokio::sync::mpsc::channel` ",
177      "to listen for messages from the main thread ",
178      "and spawn a task there."
179    ))
180    .log_error("SPAWN");
181    panic!();
182  }
183  let (join_sender, join_receiver) = once_channel();
184  let (cancel_sender, cancel_receiver) = once_channel::<()>();
185  spawn_local(async move {
186    let result = SelectFuture::new(
187      async move {
188        let output = future.await;
189        Ok(output)
190      },
191      async move {
192        cancel_receiver.await;
193        Err(JoinError { cancelled: true })
194      },
195    )
196    .await;
197    join_sender.send(result);
198  });
199  JoinHandle {
200    join_receiver,
201    cancel_sender,
202  }
203}
204
205/// Runs the provided closure on a web worker(thread) where blocking is acceptable.
206///
207/// In general, issuing a blocking call or performing a lot of compute in a
208/// future without yielding is problematic, as it may prevent the JavaScript runtime from
209/// driving other futures forward. This function runs the provided closure on a
210/// web worker dedicated to blocking operations.
211///
212/// More and more web workers will be spawned when they are requested through this
213/// function until the upper limit of 512 is reached.
214/// After reaching the upper limit, the tasks will wait for
215/// any of the web workers to become idle.
216/// When a web worker remains idle for 10 seconds, it will be terminated
217/// and get removed from the worker pool, which is a similiar behavior to that of `tokio`.
218/// The web worker limit is very large by default, because `spawn_blocking` is often
219/// used for various kinds of IO operations that cannot be performed
220/// asynchronously.  When you run CPU-bound code using `spawn_blocking`, you
221/// should keep this large upper limit in mind.
222///
223/// # Examples
224///
225/// Pass an input value and receive result of computation:
226///
227/// ```
228/// use tokio_with_wasm as tokio;
229///
230/// // Initial input
231/// let mut data = "Hello, ".to_string();
232/// let output = tokio::task::spawn_blocking(move || {
233///     // Stand-in for compute-heavy work or using synchronous APIs
234///     data.push_str("world");
235///     // Pass ownership of the value back to the asynchronous context
236///     data
237/// }).await?;
238///
239/// // `output` is the value returned from the thread
240/// assert_eq!(output.as_str(), "Hello, world");
241/// Ok(())
242/// ```
243pub fn spawn_blocking<C, T>(callable: C) -> JoinHandle<T>
244where
245  C: FnOnce() -> T + Send + 'static,
246  T: Send + 'static,
247{
248  if !is_main_thread() {
249    JsValue::from_str(concat!(
250      "Calling `spawn_blocking` in a blocking thread is not allowed. ",
251      "While this is possible in real `tokio`, ",
252      "it may cause undefined behavior in the JavaScript environment. ",
253      "Instead, use `tokio::sync::mpsc::channel` ",
254      "to listen for messages from the main thread ",
255      "and spawn a task there."
256    ))
257    .log_error("SPAWN_BLOCKING");
258    panic!();
259  }
260  let (join_sender, join_receiver) = once_channel();
261  let (cancel_sender, cancel_receiver) = once_channel::<()>();
262  WORKER_POOL.with(move |worker_pool| {
263    worker_pool.queue_task(move || {
264      if cancel_receiver.is_done() {
265        join_sender.send(Err(JoinError { cancelled: true }));
266        return;
267      }
268      let returned = callable();
269      join_sender.send(Ok(returned));
270    })
271  });
272  JoinHandle {
273    join_receiver,
274    cancel_sender,
275  }
276}
277
278/// Yields execution back to the JavaScript event loop.
279///
280/// To avoid blocking inside a long-running function,
281/// you have to yield to the async event loop regularly.
282///
283/// The async task may resume when it has its turn back.
284/// Meanwhile, any other pending tasks will be scheduled
285/// by the JavaScript runtime.
286pub async fn yield_now() {
287  let promise = Promise::new(&mut |resolve, _reject| {
288    set_timeout(&resolve, 0.0);
289  });
290  JsFuture::from(promise).await.log_error("YIELD_NOW");
291}
292
293/// An owned permission to join on a task (awaiting its termination).
294///
295/// This can be thought of as the equivalent of
296/// [`std::thread::JoinHandle`] or `tokio::task::JoinHandle` for
297/// a task that is executed concurrently.
298///
299/// A `JoinHandle` *detaches* the associated task when it is dropped, which
300/// means that there is no longer any handle to the task, and no way to `join`
301/// on it.
302///
303/// This struct is created by the [`spawn`] and [`spawn_blocking`]
304/// functions.
305///
306/// # Examples
307///
308/// Creation from [`spawn`]:
309///
310/// ```
311/// use tokio_with_wasm as tokio;
312/// use tokio::spawn;
313///
314/// let join_handle: tokio::task::JoinHandle<_> = spawn(async {
315///     // some work here
316/// });
317/// ```
318///
319/// Creation from [`spawn_blocking`]:
320///
321/// ```
322/// use tokio_with_wasm as tokio;
323/// use tokio::task::spawn_blocking;
324///
325/// let join_handle: tokio::task::JoinHandle<_> = spawn_blocking(|| {
326///     // some blocking work here
327/// });
328/// ```
329///
330/// Child being detached and outliving its parent:
331///
332/// ```no_run
333/// use tokio_with_wasm as tokio;
334/// use tokio::spawn;
335///
336/// let original_task = spawn(async {
337///     let _detached_task = spawn(async {
338///         // Here we sleep to make sure that the first task returns before.
339///         // Assume that code takes a few seconds to execute here.
340///         // This will be called, even though the JoinHandle is dropped.
341///         println!("♫ Still alive ♫");
342///     });
343/// });
344///
345/// original_task.await;
346/// println!("Original task is joined.");
347/// ```
348pub struct JoinHandle<T> {
349  join_receiver: OnceReceiver<Result<T, JoinError>>,
350  cancel_sender: OnceSender<()>,
351}
352
353impl<T> Future for JoinHandle<T> {
354  type Output = Result<T, JoinError>;
355  fn poll(
356    mut self: Pin<&mut Self>,
357    cx: &mut Context<'_>,
358  ) -> Poll<Self::Output> {
359    let pinned_receiver = Pin::new(&mut self.join_receiver);
360    pinned_receiver.poll(cx)
361  }
362}
363
364impl<T> Debug for JoinHandle<T>
365where
366  T: Debug,
367{
368  fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
369    fmt.debug_struct("JoinHandle").finish()
370  }
371}
372
373impl<T> JoinHandle<T> {
374  /// Abort the task associated with the handle.
375  ///
376  /// Awaiting a cancelled task might complete as usual if the task was
377  /// already completed at the time it was cancelled, but most likely it
378  /// will fail with a cancelled `JoinError`.
379  ///
380  /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
381  /// because they are not async. If you call `abort` on a `spawn_blocking`
382  /// task, then this *will not have any effect*, and the task will continue
383  /// running normally. The exception is if the task has not started running
384  /// yet; in that case, calling `abort` may prevent the task from starting.
385  ///
386  /// ```rust
387  /// use tokio_with_wasm as tokio;
388  /// use tokio::time;
389  ///
390  /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
391  /// # async fn main() {
392  /// let mut handles = Vec::new();
393  ///
394  /// handles.push(tokio::spawn(async {
395  ///    time::sleep(time::Duration::from_secs(10)).await;
396  ///    true
397  /// }));
398  ///
399  /// handles.push(tokio::spawn(async {
400  ///    time::sleep(time::Duration::from_secs(10)).await;
401  ///    false
402  /// }));
403  ///
404  /// for handle in &handles {
405  ///     handle.abort();
406  /// }
407  ///
408  /// for handle in handles {
409  ///     assert!(handle.await.unwrap_err().is_cancelled());
410  /// }
411  /// # }
412  /// ```
413  pub fn abort(&self) {
414    self.cancel_sender.send(());
415  }
416
417  /// Checks if the task associated with this `JoinHandle` has finished.
418  ///
419  /// Please note that this method can return `false` even if [`abort`] has been
420  /// called on the task. This is because the cancellation process may take
421  /// some time, and this method does not return `true` until it has
422  /// completed.
423  pub fn is_finished(&self) -> bool {
424    self.join_receiver.is_done()
425  }
426
427  /// Returns a new `AbortHandle` that can be used to remotely abort this task.
428  pub fn abort_handle(&self) -> AbortHandle {
429    AbortHandle {
430      cancel_sender: self.cancel_sender.clone(),
431    }
432  }
433}
434
435/// Returned when a task failed to execute to completion.
436#[derive(Debug)]
437pub struct JoinError {
438  cancelled: bool,
439}
440
441impl Display for JoinError {
442  fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
443    fmt.write_str("task failed to execute to completion")
444  }
445}
446
447impl Error for JoinError {}
448
449impl JoinError {
450  pub fn is_cancelled(&self) -> bool {
451    self.cancelled
452  }
453}
454
455/// An owned permission to abort a spawned task, without awaiting its completion.
456///
457/// Unlike a [`JoinHandle`], an `AbortHandle` does *not* represent the
458/// permission to await the task's completion, only to terminate it.
459///
460/// The task may be aborted by calling the [`AbortHandle::abort`] method.
461/// Dropping an `AbortHandle` releases the permission to terminate the task
462/// --- it does *not* abort the task.
463///
464/// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
465/// because they are not async. If you call `abort` on a `spawn_blocking` task,
466/// then this *will not have any effect*, and the task will continue running
467/// normally. The exception is if the task has not started running yet; in that
468/// case, calling `abort` may prevent the task from starting.
469///
470/// [`JoinHandle`]: crate::task::JoinHandle
471/// [`spawn_blocking`]: crate::task::spawn_blocking
472#[derive(Clone)]
473pub struct AbortHandle {
474  cancel_sender: OnceSender<()>,
475}
476
477impl AbortHandle {
478  /// Abort the task associated with the handle.
479  ///
480  /// Awaiting a cancelled task might complete as usual if the task was
481  /// already completed at the time it was cancelled, but most likely it
482  /// will fail with a [cancelled] `JoinError`.
483  ///
484  /// If the task was already cancelled, such as by [`JoinHandle::abort`],
485  /// this method will do nothing.
486  ///
487  /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
488  /// because they are not async. If you call `abort` on a `spawn_blocking`
489  /// task, then this *will not have any effect*, and the task will continue
490  /// running normally. The exception is if the task has not started running
491  /// yet; in that case, calling `abort` may prevent the task from starting.
492  pub fn abort(&self) {
493    self.cancel_sender.send(());
494  }
495}
496
497impl Debug for AbortHandle {
498  fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
499    fmt.debug_struct("AbortHandle").finish()
500  }
501}