wasm_bindgen_spawn/
lib.rs

1#![doc = include_str!("../README.md")]
2
3#[cfg(all(target_arch = "wasm32", not(target_feature = "atomics"), not(doc)))]
4compile_error!(
5    "Some target features are not enabled. Please read the README and set the right rustflags"
6);
7
8use std::panic::UnwindSafe;
9use std::ptr::NonNull;
10use std::sync::atomic::AtomicUsize;
11use std::sync::mpsc;
12
13use js_sys::{Function, Promise};
14use wasm_bindgen::prelude::*;
15#[cfg(feature = "async")]
16use wasm_bindgen_futures::JsFuture;
17
18type BoxClosure = Box<dyn FnOnce() -> BoxValue + Send + UnwindSafe + 'static>;
19type BoxValue = Box<dyn Send + 'static>;
20type ValueSender = oneshot::Sender<Result<BoxValue, JoinError>>;
21type ValueReceiver = oneshot::Receiver<Result<BoxValue, JoinError>>;
22
23type DispatchPayload = (usize, BoxClosure, ValueSender);
24type DispatchSender = mpsc::Sender<DispatchPayload>;
25type DispatchReceiver = mpsc::Receiver<DispatchPayload>;
26
27type SignalSender = oneshot::Sender<()>;
28type SignalReceiver = oneshot::Receiver<()>;
29
30/// Error when joining a thread with a [`JoinHandle`]
31#[derive(Debug, thiserror::Error)]
32pub enum JoinError {
33    #[error("WASM thread {0} panicked")]
34    Panic(usize),
35}
36
37/// Error when spawning a thread with [`ThreadCreator::spawn`]
38#[derive(Debug, thiserror::Error)]
39pub enum SpawnError {
40    #[error("Cannot spawn WASM thread because the dispatcher has disconnected")]
41    Disconnected,
42}
43
44#[wasm_bindgen]
45extern "C" {
46    /// Binding to wasm.memory
47    #[wasm_bindgen(js_name = memory, js_namespace = wasm, thread_local_v2)]
48    static MEMORY: JsValue;
49    #[wasm_bindgen(js_name = __dispatch_poll_worker, js_namespace = wasm_bindgen, thread_local_v2)]
50    static DISPATCH_POLL_WORKER: JsValue;
51}
52
53/// Handle for a dedicated Web Worker for dispatching new threads.
54///
55/// Please see below for example and how to avoid potential deadlocks.
56///
57/// # Example
58/// Suppose your WASM package is built with `wasm-pack` using the following command:
59/// ```sh
60/// wasm-pack build -t no-modules --out-dir ./pkg
61/// ```
62/// which should produce `pkg/your_package_bg.wasm` and `pkg/your_package.js`.
63///
64/// Then you can create a `ThreadCreator` with:
65/// ```no_run
66/// use wasm_bindgen::prelude::*;
67/// use wasm_bindgen_spawm::ThreadCreator;
68///
69/// let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js");
70/// // on error, this is a JsValue error
71/// assert!(thread_creator.is_ok());
72/// ```
73///
74/// # Dispatcher ready
75/// Note that the function to create the Thread Creator is called `unready` rather than `new`.
76/// This is because the JS runtime may only start the dispatcher thread after the current
77/// execution context is finished. Blocking the thread before the ThreadCreator is ready may
78/// cause deadlocks.
79///
80/// For example, the following code will cause a deadlock, supposed there is a `new` function
81/// ```rust,ignore
82/// use wasm_bindgen::prelude::*;
83/// use wasm_bindgen_spawm::ThreadCreator;
84///
85/// pub fn will_deadlock() -> Result<(), Box<dyn std::error::Error>> {
86///     // the `new` function is hypothetical
87///     let thread_creator = ThreadCreator::new("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
88///     // calling `spawn` is ok here
89///     let thread = thread_creator.spawn(move || {
90///         // do some work
91///     })?;
92///     // this will deadlock because the thread won't be spawned until this synchronous context is
93///     // finished
94///     thread.join()?;
95///
96///     Ok(())   
97/// }
98/// ```
99/// The `unready` factory function exists to ensure user calls
100/// [`ready`](ThreadCreatorUnready::ready)
101/// before start using the `ThreadCreator` to spawn threads. It also has a nice side effect that
102/// `ThreadCreator` is now `Send + Sync` since it doesn't need to hold the `Promise`
103/// ```no_run
104/// use wasm_bindgen::prelude::*;
105/// use wasm_bindgen_spawm::ThreadCreator;
106///
107/// pub async fn will_not_deadlock() -> Result<(), Box<dyn std::error::Error>> {
108///     let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
109///     let thread_creator = thread_creator.ready().await?;
110///
111///     let thread = thread_creator.spawn(move || {
112///         return 42;
113///     })?;
114///     let value = thread.join()?;
115///     assert_eq!(value, 42);
116///
117///     Ok(())   
118/// }
119/// ```
120/// Note that only `ready` requires `await`, and not `spawn` or `join`. This is because
121/// once the dispatcher is ready, shared memory is used for sending the spawn payload
122/// to the dispatcher instead of `postMessage`. This also means you only need this async step
123/// once when creating the `ThreadCreator`. You can write the rest of the code without `async`.
124///
125/// You can also
126/// disable the `async` feature and use [`into_promise_and_inner`](ThreadCreatorUnready::into_promise_and_inner)
127/// to avoid depending on `wasm-bindgen-futures`. You need to manually wait for the promise in this
128/// case before using the `ThreadCreator` (for example sending the promise to JS and awaiting it there).
129/// See the example below for more information.
130///
131/// # Joining threads
132/// Joining should feel pretty much like the `std` library. However, there is one caveat -
133/// The main thread in Web cannot be blocked. This means in order to join a thread, you must
134/// run the WASM module in a Web Worker.
135///
136/// See [`JoinHandle`] for more information.
137///
138/// # Terminating
139/// When the `ThreadCreator` is dropped, the dispatcher worker will be terminated.
140/// Threads that are `spawn`-ed but not started may start or not start, but
141/// threads that are already running are not impacted and can still be `join`-ed.
142///
143/// Generally you should avoid dropping the `ThreadCreator` until all spawned threads are joined.
144/// You can create a global thread creator by using `thread_local`:
145/// ```no_run
146/// use wasm_bindgen::prelude::*;
147/// use wasm_bindgen_spawn::ThreadCreator;
148///
149/// thread_local! {
150///     static THREAD_CREATOR: OnceCell<ThreadCreator> = OnceCell::new();
151/// }
152///
153/// #[wasm_bindgen]
154/// pub fn create_thread_creator() -> Result<Promise, JsValue> {
155///     let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
156///     let (promise, thread_creator) = thread_creator.into_promise_and_inner();
157///     THREAD_CREATOR.with(move |tc| {
158///         tc.set(thread_creator);
159///     });
160///     Ok(promise)
161///     // the promise can then be awaited in JS (this is useful if the rest
162///     // of your code doesn't need wasm-bindgen-futures)
163/// }
164///
165///
166/// // On JS side, make sure this function is only called after the promise is resolved.
167/// #[wasm_bindgen]
168/// pub fn do_some_work_on_thread() {
169///     let handle = THREAD_CREATOR.with(|tc| {
170///         let tc = tc.get().unwrap();
171///         tc.spawn(move || {
172///             // do some work
173///         }).unwrap()
174///     });
175///
176///     handle.join().unwrap();
177/// }
178/// ```
179pub struct ThreadCreator {
180    /// Id for the next thread
181    next_id: AtomicUsize,
182    /// Sender to send the thread closure to the dispatcher for creating threads
183    send: DispatchSender,
184}
185static_assertions::assert_impl_all!(ThreadCreator: Send, Sync);
186
187/// See [`ThreadCreator::unready`] for more information
188pub struct ThreadCreatorUnready {
189    thread_creator: ThreadCreator,
190    /// Promise for if the dispatcher is ready
191    dispatcher_promise: Promise,
192}
193
194impl ThreadCreatorUnready {
195    /// Returns the promise that resolves when the dispatcher is ready,
196    /// and the inner [`ThreadCreator`]. Note that the inner creator
197    /// can only be used after awaiting on the Promise.
198    ///
199    /// In async context, it might be more convenient to use [`ready`](ThreadCreatorUnready::ready)
200    /// instead
201    ///
202    /// See the struct documentation for more information
203    pub fn into_promise_and_inner(self) -> (Promise, ThreadCreator) {
204        (self.dispatcher_promise, self.thread_creator)
205    }
206
207    /// Await the dispatcher to be ready.
208    ///
209    /// See the struct documentation for more information
210    #[cfg(feature = "async")]
211    pub async fn ready(self) -> Result<ThreadCreator, JsValue> {
212        JsFuture::from(self.dispatcher_promise).await?;
213        Ok(self.thread_creator)
214    }
215}
216
217impl ThreadCreator {
218    /// Create a Web Worker to dispatch threads with the wasm module url and the
219    /// wasm_bindgen JS url. The Worker may not be ready until `ready` is awaited
220    ///
221    /// See the struct documentation for more information
222    pub fn unready(wasm_url: &str, wbg_url: &str) -> Result<ThreadCreatorUnready, JsValue> {
223        // function([wasm_url, wbg_url, memory, recv]) -> Promise<void>;
224        let create_dispatcher =
225            Function::new_with_args("args", include_str!("js/createDispatcher.min.js"));
226        let wasm_url = JsValue::from_str(wasm_url);
227        let wbg_url = JsValue::from_str(wbg_url);
228        let memory = MEMORY.with(|memory| memory.clone());
229        let (send, recv) = mpsc::channel::<DispatchPayload>();
230        let recv_ptr = JsValue::from(NonNull::from(Box::leak(Box::new(recv))));
231        let (start_send, start_recv) = oneshot::channel::<()>();
232        let start_send = Box::into_raw(Box::new(start_send));
233        let start_recv = Box::into_raw(Box::new(start_recv));
234        let start_send_ptr = unsafe { NonNull::new_unchecked(start_send) };
235        let start_recv_ptr = unsafe { NonNull::new_unchecked(start_recv) };
236        // create the dispatcher
237        let promise = create_dispatcher
238            .call1(
239                &JsValue::null(),
240                &JsValue::from(vec![
241                    wasm_url,
242                    wbg_url,
243                    memory,
244                    recv_ptr,
245                    JsValue::from(start_send_ptr),
246                    JsValue::from(start_recv_ptr),
247                    DISPATCH_POLL_WORKER.with(|v| v.clone()),
248                ]),
249            )?
250            .dyn_into::<Promise>()?;
251        Ok(ThreadCreatorUnready {
252            thread_creator: Self {
253                next_id: AtomicUsize::new(1),
254                send,
255            },
256            dispatcher_promise: promise,
257        })
258    }
259
260    /// Spawn a new thread to execute F.
261    ///
262    /// Note that spawning new thread is very slow. Pool them if you can.
263    pub fn spawn<F, T>(&self, f: F) -> Result<JoinHandle<T>, SpawnError>
264    where
265        F: FnOnce() -> T + Send + 'static + UnwindSafe,
266        T: Send + 'static,
267    {
268        let next_id = self
269            .next_id
270            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
271        // make a closure that returns the value boxed
272        let closure: BoxClosure = Box::new(move || Box::new(f()));
273        let (send, recv) = oneshot::channel();
274        let payload = (next_id, closure, send);
275        self.send
276            .send(payload)
277            .map_err(|_| SpawnError::Disconnected)?;
278
279        Ok(JoinHandle {
280            id: next_id,
281            recv,
282            _marker: std::marker::PhantomData,
283        })
284    }
285}
286
287/// Handle for joining a thread
288pub struct JoinHandle<T: Send + 'static> {
289    id: usize,
290    recv: ValueReceiver,
291    _marker: std::marker::PhantomData<T>,
292}
293
294impl<T: Send + 'static> JoinHandle<T> {
295    /// Join the thread. Block the current thread until the thread is finished.
296    ///
297    /// Returns the value returned by the thread closure. If the thread panicked,
298    /// this returns a [`JoinError`].
299    ///
300    /// # Unwind and Poisoning
301    /// Note that `wasm32-unknown-unknown` target does not support unwinding yet.
302    /// This means safety mechanisms like poisoning are not available. Panicking
303    /// while holding a lock will not release the lock and will likely produce a dead lock.
304    pub fn join(self) -> Result<T, JoinError> {
305        // recv() will only error if somehow the thread terminated without sending a value
306        let value = self.recv.recv().map_err(|_| JoinError::Panic(self.id))?;
307        // will error if panicked
308        let value = value?;
309        // cast the raw pointer back
310        let value_raw = Box::into_raw(value) as *mut T;
311        let value = unsafe { Box::from_raw(value_raw) };
312        Ok(*value)
313    }
314}
315
316#[inline]
317fn make_closure<F: FnOnce() -> BoxValue + Send + 'static + UnwindSafe>(
318    f: F,
319) -> NonNull<BoxClosure> {
320    let boxed: BoxClosure = Box::new(f);
321    NonNull::from(Box::leak(Box::new(boxed)))
322}
323
324#[doc(hidden)]
325#[wasm_bindgen]
326pub fn __worker_main(f: NonNull<BoxClosure>, start: NonNull<SignalSender>) -> NonNull<BoxValue> {
327    // signal the dispatcher that the worker is now started, and is safe to block
328    __dispatch_start(start);
329    let f = unsafe { Box::from_raw(f.as_ptr()) };
330    let value = f();
331    let value_ptr = Box::into_raw(Box::new(value));
332    unsafe { NonNull::new_unchecked(value_ptr) }
333}
334
335#[doc(hidden)]
336#[wasm_bindgen]
337pub fn __worker_send(id: usize, send: NonNull<ValueSender>, value: Option<NonNull<BoxValue>>) {
338    let send_ptr = send.as_ptr();
339    let send = unsafe { Box::from_raw(send_ptr) };
340    match value {
341        None => {
342            let _ = send.send(Err(JoinError::Panic(id)));
343        }
344        Some(value) => {
345            let value = unsafe { Box::from_raw(value.as_ptr()) };
346            let _ = send.send(Ok(*value));
347        }
348    }
349}
350
351/// Send a start signal to indicate the dispatcher is ready
352#[doc(hidden)]
353#[wasm_bindgen]
354pub fn __dispatch_start(start: NonNull<SignalSender>) {
355    let start_ptr = start.as_ptr();
356    let start = unsafe { Box::from_raw(start_ptr) };
357    let _ = start.send(());
358}
359
360/// Receive a request to spawn a thread with the dispatcher.
361#[doc(hidden)]
362#[wasm_bindgen]
363pub fn __dispatch_recv(recv: NonNull<DispatchReceiver>) -> Option<Vec<JsValue>> {
364    // cast as reference so we don't drop it
365    let recv: &DispatchReceiver = unsafe { recv.as_ref() };
366    let (id, closure, sender) = match recv.recv() {
367        Ok(v) => v,
368        Err(_) => return None,
369    };
370    let sender_ptr = NonNull::from(Box::leak(Box::new(sender)));
371    let (start_send, start_recv) = oneshot::channel::<()>();
372    let start_send_ptr = NonNull::from(Box::leak(Box::new(start_send)));
373    let start_recv_ptr = NonNull::from(Box::leak(Box::new(start_recv)));
374    let value_vec = vec![
375        id.into(),
376        make_closure(closure).into(),
377        sender_ptr.into(),
378        start_send_ptr.into(),
379        start_recv_ptr.into(),
380    ];
381    Some(value_vec)
382}
383
384/// Return true if the spawned thread has started and the dispatcher
385/// could start blocking for waiting for new spawn requests
386#[doc(hidden)]
387#[wasm_bindgen]
388pub fn __dispatch_poll_worker(start_recv: NonNull<SignalReceiver>) -> bool {
389    if unsafe { start_recv.as_ref() }.try_recv().is_ok() {
390        let start_recv = unsafe { Box::from_raw(start_recv.as_ptr()) };
391        drop(start_recv);
392        true
393    } else {
394        false
395    }
396}
397
398/// Drop the receiver
399#[doc(hidden)]
400#[wasm_bindgen]
401pub fn __dispatch_drop(recv: NonNull<mpsc::Receiver<BoxClosure>>) {
402    let recv: Box<mpsc::Receiver<BoxClosure>> = unsafe { Box::from_raw(recv.as_ptr()) };
403    drop(recv);
404}