use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use js_sys::Promise;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
use web_sys::{ErrorEvent, Event, Worker};
#[wasm_bindgen()]
extern "C" {
#[wasm_bindgen(js_name = newWorker)]
fn new_worker() -> JsValue;
}
pub struct WorkerPool {
new_worker: Box<dyn Fn() -> Worker>,
state: Rc<PoolState>,
}
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
struct Work {
func: Box<dyn (FnOnce() -> Promise) + Send>,
}
impl WorkerPool {
pub fn new(initial: usize, new_worker: Box<dyn Fn() -> Worker>) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
new_worker,
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::wrap(Box::new(|event: Event| {
log::error!("unhandled event: {}", event.type_());
}) as Box<dyn FnMut(Event)>),
}),
};
for _ in 0..initial {
let worker = pool.spawn()?;
pool.state.push(worker);
}
Ok(pool)
}
fn spawn(&self) -> Result<Worker, JsValue> {
log::info!("spawning new worker");
let worker = (self.new_worker)();
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
array.push(&wasm_bindgen::memory());
worker.post_message(&array)?;
Ok(worker)
}
fn worker(&self) -> Result<Worker, JsValue> {
match self.state.workers.borrow_mut().pop() {
Some(worker) => Ok(worker),
None => self.spawn(),
}
}
pub fn execute(
&self,
f: impl (FnOnce() -> Promise) + Send + 'static,
) -> Result<Worker, JsValue> {
let worker = self.worker()?;
let work = Box::new(Work { func: Box::new(f) });
let ptr = Box::into_raw(work);
match worker.post_message(&JsValue::from(ptr as u32)) {
Ok(()) => Ok(worker),
Err(e) => {
unsafe {
drop(Box::from_raw(ptr));
}
Err(e)
}
}
}
fn reclaim_on_message(&self, worker: Worker) {
let state = Rc::downgrade(&self.state);
let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone();
let reclaim = Closure::wrap(Box::new(move |event: Event| {
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
log::error!("error in worker: {}", error.message());
return;
}
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
if let Some(state) = state.upgrade() {
state.push(worker2.clone());
}
*slot2.borrow_mut() = None;
return;
}
log::error!("unhandled event: {}", event.type_());
}) as Box<dyn FnMut(Event)>);
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
*reclaim_slot.borrow_mut() = Some(reclaim);
}
pub fn run(&self, f: impl (FnOnce() -> Promise) + Send + 'static) -> Result<(), JsValue> {
let worker = self.execute(f)?;
self.reclaim_on_message(worker);
Ok(())
}
}
impl PoolState {
fn push(&self, worker: Worker) {
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
let mut workers = self.workers.borrow_mut();
for prev in workers.iter() {
let prev: &JsValue = prev;
let worker: &JsValue = &worker;
assert!(prev != worker);
}
workers.push(worker);
}
}
#[wasm_bindgen]
pub async fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
JsFuture::from((ptr.func)()).await?;
global.post_message(&JsValue::undefined())?;
Ok(())
}