wasm_bindgen_spawn/lib.rs
1#![doc = include_str!("../README.md")]
2
3#[cfg(all(target_arch = "wasm32", not(target_feature = "atomics"), not(doc)))]
4compile_error!(
5 "Some target features are not enabled. Please read the README and set the right rustflags"
6);
7
8use std::panic::UnwindSafe;
9use std::ptr::NonNull;
10use std::sync::atomic::AtomicUsize;
11use std::sync::mpsc;
12
13use js_sys::{Function, Promise};
14use wasm_bindgen::prelude::*;
15#[cfg(feature = "async")]
16use wasm_bindgen_futures::JsFuture;
17
18type BoxClosure = Box<dyn FnOnce() -> BoxValue + Send + UnwindSafe + 'static>;
19type BoxValue = Box<dyn Send + 'static>;
20type ValueSender = oneshot::Sender<Result<BoxValue, JoinError>>;
21type ValueReceiver = oneshot::Receiver<Result<BoxValue, JoinError>>;
22
23type DispatchPayload = (usize, BoxClosure, ValueSender);
24type DispatchSender = mpsc::Sender<DispatchPayload>;
25type DispatchReceiver = mpsc::Receiver<DispatchPayload>;
26
27type SignalSender = oneshot::Sender<()>;
28type SignalReceiver = oneshot::Receiver<()>;
29
30/// Error when joining a thread with a [`JoinHandle`]
31#[derive(Debug, thiserror::Error)]
32pub enum JoinError {
33 #[error("WASM thread {0} panicked")]
34 Panic(usize),
35}
36
37/// Error when spawning a thread with [`ThreadCreator::spawn`]
38#[derive(Debug, thiserror::Error)]
39pub enum SpawnError {
40 #[error("Cannot spawn WASM thread because the dispatcher has disconnected")]
41 Disconnected,
42}
43
44#[wasm_bindgen]
45extern "C" {
46 /// Binding to wasm.memory
47 #[wasm_bindgen(js_name = memory, js_namespace = wasm, thread_local_v2)]
48 static MEMORY: JsValue;
49 #[wasm_bindgen(js_name = __dispatch_poll_worker, js_namespace = wasm_bindgen, thread_local_v2)]
50 static DISPATCH_POLL_WORKER: JsValue;
51}
52
53/// Handle for a dedicated Web Worker for dispatching new threads.
54///
55/// Please see below for example and how to avoid potential deadlocks.
56///
57/// # Example
58/// Suppose your WASM package is built with `wasm-pack` using the following command:
59/// ```sh
60/// wasm-pack build -t no-modules --out-dir ./pkg
61/// ```
62/// which should produce `pkg/your_package_bg.wasm` and `pkg/your_package.js`.
63///
64/// Then you can create a `ThreadCreator` with:
65/// ```no_run
66/// use wasm_bindgen::prelude::*;
67/// use wasm_bindgen_spawm::ThreadCreator;
68///
69/// let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js");
70/// // on error, this is a JsValue error
71/// assert!(thread_creator.is_ok());
72/// ```
73///
74/// # Dispatcher ready
75/// Note that the function to create the Thread Creator is called `unready` rather than `new`.
76/// This is because the JS runtime may only start the dispatcher thread after the current
77/// execution context is finished. Blocking the thread before the ThreadCreator is ready may
78/// cause deadlocks.
79///
80/// For example, the following code will cause a deadlock, supposed there is a `new` function
81/// ```rust,ignore
82/// use wasm_bindgen::prelude::*;
83/// use wasm_bindgen_spawm::ThreadCreator;
84///
85/// pub fn will_deadlock() -> Result<(), Box<dyn std::error::Error>> {
86/// // the `new` function is hypothetical
87/// let thread_creator = ThreadCreator::new("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
88/// // calling `spawn` is ok here
89/// let thread = thread_creator.spawn(move || {
90/// // do some work
91/// })?;
92/// // this will deadlock because the thread won't be spawned until this synchronous context is
93/// // finished
94/// thread.join()?;
95///
96/// Ok(())
97/// }
98/// ```
99/// The `unready` factory function exists to ensure user calls
100/// [`ready`](ThreadCreatorUnready::ready)
101/// before start using the `ThreadCreator` to spawn threads. It also has a nice side effect that
102/// `ThreadCreator` is now `Send + Sync` since it doesn't need to hold the `Promise`
103/// ```no_run
104/// use wasm_bindgen::prelude::*;
105/// use wasm_bindgen_spawm::ThreadCreator;
106///
107/// pub async fn will_not_deadlock() -> Result<(), Box<dyn std::error::Error>> {
108/// let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
109/// let thread_creator = thread_creator.ready().await?;
110///
111/// let thread = thread_creator.spawn(move || {
112/// return 42;
113/// })?;
114/// let value = thread.join()?;
115/// assert_eq!(value, 42);
116///
117/// Ok(())
118/// }
119/// ```
120/// Note that only `ready` requires `await`, and not `spawn` or `join`. This is because
121/// once the dispatcher is ready, shared memory is used for sending the spawn payload
122/// to the dispatcher instead of `postMessage`. This also means you only need this async step
123/// once when creating the `ThreadCreator`. You can write the rest of the code without `async`.
124///
125/// You can also
126/// disable the `async` feature and use [`into_promise_and_inner`](ThreadCreatorUnready::into_promise_and_inner)
127/// to avoid depending on `wasm-bindgen-futures`. You need to manually wait for the promise in this
128/// case before using the `ThreadCreator` (for example sending the promise to JS and awaiting it there).
129/// See the example below for more information.
130///
131/// # Joining threads
132/// Joining should feel pretty much like the `std` library. However, there is one caveat -
133/// The main thread in Web cannot be blocked. This means in order to join a thread, you must
134/// run the WASM module in a Web Worker.
135///
136/// See [`JoinHandle`] for more information.
137///
138/// # Terminating
139/// When the `ThreadCreator` is dropped, the dispatcher worker will be terminated.
140/// Threads that are `spawn`-ed but not started may start or not start, but
141/// threads that are already running are not impacted and can still be `join`-ed.
142///
143/// Generally you should avoid dropping the `ThreadCreator` until all spawned threads are joined.
144/// You can create a global thread creator by using `thread_local`:
145/// ```no_run
146/// use wasm_bindgen::prelude::*;
147/// use wasm_bindgen_spawn::ThreadCreator;
148///
149/// thread_local! {
150/// static THREAD_CREATOR: OnceCell<ThreadCreator> = OnceCell::new();
151/// }
152///
153/// #[wasm_bindgen]
154/// pub fn create_thread_creator() -> Result<Promise, JsValue> {
155/// let thread_creator = ThreadCreator::unready("pkg/your_package_bg.wasm", "pkg/your_package.js")?;
156/// let (promise, thread_creator) = thread_creator.into_promise_and_inner();
157/// THREAD_CREATOR.with(move |tc| {
158/// tc.set(thread_creator);
159/// });
160/// Ok(promise)
161/// // the promise can then be awaited in JS (this is useful if the rest
162/// // of your code doesn't need wasm-bindgen-futures)
163/// }
164///
165///
166/// // On JS side, make sure this function is only called after the promise is resolved.
167/// #[wasm_bindgen]
168/// pub fn do_some_work_on_thread() {
169/// let handle = THREAD_CREATOR.with(|tc| {
170/// let tc = tc.get().unwrap();
171/// tc.spawn(move || {
172/// // do some work
173/// }).unwrap()
174/// });
175///
176/// handle.join().unwrap();
177/// }
178/// ```
179pub struct ThreadCreator {
180 /// Id for the next thread
181 next_id: AtomicUsize,
182 /// Sender to send the thread closure to the dispatcher for creating threads
183 send: DispatchSender,
184}
185static_assertions::assert_impl_all!(ThreadCreator: Send, Sync);
186
187/// See [`ThreadCreator::unready`] for more information
188pub struct ThreadCreatorUnready {
189 thread_creator: ThreadCreator,
190 /// Promise for if the dispatcher is ready
191 dispatcher_promise: Promise,
192}
193
194impl ThreadCreatorUnready {
195 /// Returns the promise that resolves when the dispatcher is ready,
196 /// and the inner [`ThreadCreator`]. Note that the inner creator
197 /// can only be used after awaiting on the Promise.
198 ///
199 /// In async context, it might be more convenient to use [`ready`](ThreadCreatorUnready::ready)
200 /// instead
201 ///
202 /// See the struct documentation for more information
203 pub fn into_promise_and_inner(self) -> (Promise, ThreadCreator) {
204 (self.dispatcher_promise, self.thread_creator)
205 }
206
207 /// Await the dispatcher to be ready.
208 ///
209 /// See the struct documentation for more information
210 #[cfg(feature = "async")]
211 pub async fn ready(self) -> Result<ThreadCreator, JsValue> {
212 JsFuture::from(self.dispatcher_promise).await?;
213 Ok(self.thread_creator)
214 }
215}
216
217impl ThreadCreator {
218 /// Create a Web Worker to dispatch threads with the wasm module url and the
219 /// wasm_bindgen JS url. The Worker may not be ready until `ready` is awaited
220 ///
221 /// See the struct documentation for more information
222 pub fn unready(wasm_url: &str, wbg_url: &str) -> Result<ThreadCreatorUnready, JsValue> {
223 // function([wasm_url, wbg_url, memory, recv]) -> Promise<void>;
224 let create_dispatcher =
225 Function::new_with_args("args", include_str!("js/createDispatcher.min.js"));
226 let wasm_url = JsValue::from_str(wasm_url);
227 let wbg_url = JsValue::from_str(wbg_url);
228 let memory = MEMORY.with(|memory| memory.clone());
229 let (send, recv) = mpsc::channel::<DispatchPayload>();
230 let recv_ptr = JsValue::from(NonNull::from(Box::leak(Box::new(recv))));
231 let (start_send, start_recv) = oneshot::channel::<()>();
232 let start_send = Box::into_raw(Box::new(start_send));
233 let start_recv = Box::into_raw(Box::new(start_recv));
234 let start_send_ptr = unsafe { NonNull::new_unchecked(start_send) };
235 let start_recv_ptr = unsafe { NonNull::new_unchecked(start_recv) };
236 // create the dispatcher
237 let promise = create_dispatcher
238 .call1(
239 &JsValue::null(),
240 &JsValue::from(vec![
241 wasm_url,
242 wbg_url,
243 memory,
244 recv_ptr,
245 JsValue::from(start_send_ptr),
246 JsValue::from(start_recv_ptr),
247 DISPATCH_POLL_WORKER.with(|v| v.clone()),
248 ]),
249 )?
250 .dyn_into::<Promise>()?;
251 Ok(ThreadCreatorUnready {
252 thread_creator: Self {
253 next_id: AtomicUsize::new(1),
254 send,
255 },
256 dispatcher_promise: promise,
257 })
258 }
259
260 /// Spawn a new thread to execute F.
261 ///
262 /// Note that spawning new thread is very slow. Pool them if you can.
263 pub fn spawn<F, T>(&self, f: F) -> Result<JoinHandle<T>, SpawnError>
264 where
265 F: FnOnce() -> T + Send + 'static + UnwindSafe,
266 T: Send + 'static,
267 {
268 let next_id = self
269 .next_id
270 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
271 // make a closure that returns the value boxed
272 let closure: BoxClosure = Box::new(move || Box::new(f()));
273 let (send, recv) = oneshot::channel();
274 let payload = (next_id, closure, send);
275 self.send
276 .send(payload)
277 .map_err(|_| SpawnError::Disconnected)?;
278
279 Ok(JoinHandle {
280 id: next_id,
281 recv,
282 _marker: std::marker::PhantomData,
283 })
284 }
285}
286
287/// Handle for joining a thread
288pub struct JoinHandle<T: Send + 'static> {
289 id: usize,
290 recv: ValueReceiver,
291 _marker: std::marker::PhantomData<T>,
292}
293
294impl<T: Send + 'static> JoinHandle<T> {
295 /// Join the thread. Block the current thread until the thread is finished.
296 ///
297 /// Returns the value returned by the thread closure. If the thread panicked,
298 /// this returns a [`JoinError`].
299 ///
300 /// # Unwind and Poisoning
301 /// Note that `wasm32-unknown-unknown` target does not support unwinding yet.
302 /// This means safety mechanisms like poisoning are not available. Panicking
303 /// while holding a lock will not release the lock and will likely produce a dead lock.
304 pub fn join(self) -> Result<T, JoinError> {
305 // recv() will only error if somehow the thread terminated without sending a value
306 let value = self.recv.recv().map_err(|_| JoinError::Panic(self.id))?;
307 // will error if panicked
308 let value = value?;
309 // cast the raw pointer back
310 let value_raw = Box::into_raw(value) as *mut T;
311 let value = unsafe { Box::from_raw(value_raw) };
312 Ok(*value)
313 }
314}
315
316#[inline]
317fn make_closure<F: FnOnce() -> BoxValue + Send + 'static + UnwindSafe>(
318 f: F,
319) -> NonNull<BoxClosure> {
320 let boxed: BoxClosure = Box::new(f);
321 NonNull::from(Box::leak(Box::new(boxed)))
322}
323
324#[doc(hidden)]
325#[wasm_bindgen]
326pub fn __worker_main(f: NonNull<BoxClosure>, start: NonNull<SignalSender>) -> NonNull<BoxValue> {
327 // signal the dispatcher that the worker is now started, and is safe to block
328 __dispatch_start(start);
329 let f = unsafe { Box::from_raw(f.as_ptr()) };
330 let value = f();
331 let value_ptr = Box::into_raw(Box::new(value));
332 unsafe { NonNull::new_unchecked(value_ptr) }
333}
334
335#[doc(hidden)]
336#[wasm_bindgen]
337pub fn __worker_send(id: usize, send: NonNull<ValueSender>, value: Option<NonNull<BoxValue>>) {
338 let send_ptr = send.as_ptr();
339 let send = unsafe { Box::from_raw(send_ptr) };
340 match value {
341 None => {
342 let _ = send.send(Err(JoinError::Panic(id)));
343 }
344 Some(value) => {
345 let value = unsafe { Box::from_raw(value.as_ptr()) };
346 let _ = send.send(Ok(*value));
347 }
348 }
349}
350
351/// Send a start signal to indicate the dispatcher is ready
352#[doc(hidden)]
353#[wasm_bindgen]
354pub fn __dispatch_start(start: NonNull<SignalSender>) {
355 let start_ptr = start.as_ptr();
356 let start = unsafe { Box::from_raw(start_ptr) };
357 let _ = start.send(());
358}
359
360/// Receive a request to spawn a thread with the dispatcher.
361#[doc(hidden)]
362#[wasm_bindgen]
363pub fn __dispatch_recv(recv: NonNull<DispatchReceiver>) -> Option<Vec<JsValue>> {
364 // cast as reference so we don't drop it
365 let recv: &DispatchReceiver = unsafe { recv.as_ref() };
366 let (id, closure, sender) = match recv.recv() {
367 Ok(v) => v,
368 Err(_) => return None,
369 };
370 let sender_ptr = NonNull::from(Box::leak(Box::new(sender)));
371 let (start_send, start_recv) = oneshot::channel::<()>();
372 let start_send_ptr = NonNull::from(Box::leak(Box::new(start_send)));
373 let start_recv_ptr = NonNull::from(Box::leak(Box::new(start_recv)));
374 let value_vec = vec![
375 id.into(),
376 make_closure(closure).into(),
377 sender_ptr.into(),
378 start_send_ptr.into(),
379 start_recv_ptr.into(),
380 ];
381 Some(value_vec)
382}
383
384/// Return true if the spawned thread has started and the dispatcher
385/// could start blocking for waiting for new spawn requests
386#[doc(hidden)]
387#[wasm_bindgen]
388pub fn __dispatch_poll_worker(start_recv: NonNull<SignalReceiver>) -> bool {
389 if unsafe { start_recv.as_ref() }.try_recv().is_ok() {
390 let start_recv = unsafe { Box::from_raw(start_recv.as_ptr()) };
391 drop(start_recv);
392 true
393 } else {
394 false
395 }
396}
397
398/// Drop the receiver
399#[doc(hidden)]
400#[wasm_bindgen]
401pub fn __dispatch_drop(recv: NonNull<mpsc::Receiver<BoxClosure>>) {
402 let recv: Box<mpsc::Receiver<BoxClosure>> = unsafe { Box::from_raw(recv.as_ptr()) };
403 drop(recv);
404}