use crate::TransferClosure;
use js_sys::Array;
use std::cell::RefCell;
use std::iter::FromIterator;
use std::rc::Rc;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::BlobPropertyBag;
use web_sys::ErrorEvent;
use web_sys::MessageEvent;
use web_sys::{Blob, Url};
use web_sys::{Event, Worker};
#[wasm_bindgen]
pub struct WorkerPool {
state: Rc<PoolState>,
script_src: String,
}
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
#[wasm_bindgen]
impl WorkerPool {
#[wasm_bindgen(constructor)]
pub fn new(initial: usize, script_src: String) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
script_src,
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::new(|event: Event| {
if let Some(event) = event.dyn_ref::<MessageEvent>() {
crate::console_error!("Dropped data:: {:?}", event.data());
} else if let Some(event) = event.dyn_ref::<ErrorEvent>() {
crate::console_error!("Failed to initialize: {}", event.message());
}
}),
}),
};
for _ in 0..initial {
let worker = pool.spawn()?;
pool.state.push(worker);
}
Ok(pool)
}
fn spawn(&self) -> Result<Worker, JsValue> {
let src = &self.script_src;
let script = format!(
"importScripts('{}');
const FRB_ACTION_PANIC = 3;
onmessage = event => {{
let init = wasm_bindgen(...event.data).catch(err => {{
setTimeout(() => {{ throw err }})
throw err
}})
onmessage = async event => {{
await init
const [payload, ...transfer] = event.data
try {{
wasm_bindgen.receive_transfer_closure(payload, transfer)
}} catch (err) {{
if (transfer[0] && typeof transfer[0].postMessage === 'function') {{
// panic
transfer[0].postMessage([FRB_ACTION_PANIC, err.toString()])
}}
setTimeout(() => {{ throw err }})
postMessage(null)
throw err
}}
}}
}}",
src
);
let blob = Blob::new_with_blob_sequence_and_options(
&Array::from_iter([JsValue::from(script)]).into(),
BlobPropertyBag::new().type_("text/javascript"),
)?;
let url = Url::create_object_url_with_blob(&blob)?;
let worker: Worker = Worker::new(&url)?;
let module = wasm_bindgen::module();
let memory = wasm_bindgen::memory();
worker.post_message(&Array::from_iter([module, memory]))?;
Ok(worker)
}
fn worker(&self) -> Result<Worker, JsValue> {
match self.state.workers.borrow_mut().pop() {
Some(worker) => Ok(worker),
None => self.spawn(),
}
}
fn execute(&self, closure: TransferClosure<JsValue>) -> Result<Worker, JsValue> {
let worker = self.worker()?;
closure.apply(&worker).map(|_| worker)
}
fn reclaim_on_message(&self, worker: &Worker) {
let state = Rc::downgrade(&self.state);
let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone();
let reclaim = Closure::<dyn FnMut(_)>::new(move |_: MessageEvent| {
if let Some(state) = state.upgrade() {
state.push(worker2.clone());
}
*slot2.borrow_mut() = None;
});
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
*reclaim_slot.borrow_mut() = Some(reclaim);
}
}
impl WorkerPool {
pub fn run(&self, closure: TransferClosure<JsValue>) -> Result<(), JsValue> {
let worker = self.execute(closure)?;
self.reclaim_on_message(&worker);
Ok(())
}
}
impl PoolState {
fn push(&self, worker: Worker) {
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
let mut workers = self.workers.borrow_mut();
for prev in workers.iter() {
let prev: &JsValue = prev;
let worker: &JsValue = &worker;
assert!(prev != worker);
}
workers.push(worker);
}
}
#[cfg(feature = "wasm-start")]
#[wasm_bindgen(start)]
pub fn run_hooks() {
console_error_panic_hook::set_once();
}