use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use serde::{de::DeserializeOwned, Serialize};
use wasm_bindgen::closure::Closure;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen_futures::spawn_local;
use web_sys::js_sys::{Object, Reflect};
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
use crate::workers::pool::WorkerJob;
type HandlerFn = Box<
dyn Fn(
JsValue,
)
-> Pin<Box<dyn Future<Output = Result<(JsValue, web_sys::js_sys::Array), String>>>>
+ 'static,
>;
thread_local! {
static REGISTRY: RefCell<HashMap<&'static str, HandlerFn>> = RefCell::new(HashMap::new());
static ONMESSAGE_HOLDER: RefCell<Option<Closure<dyn FnMut(MessageEvent)>>> = RefCell::new(None);
}
pub fn register_job<J: WorkerJob>() {
register::<J>()
}
pub(crate) fn register<J: WorkerJob>() {
REGISTRY.with(|r| {
let mut r = r.borrow_mut();
if r.contains_key(J::NAME) {
tracing::debug!("workers: job {} already registered; ignoring", J::NAME);
return;
}
let handler: HandlerFn = Box::new(|input_js: JsValue| {
Box::pin(async move {
let input: J::Input = serde_wasm_bindgen::from_value(input_js)
.map_err(|e| format!("deserialize input: {e}"))?;
let output = J::execute(input)
.await
.map_err(|e| format!("execute: {e}"))?;
J::into_response_message(output)
})
});
r.insert(J::NAME, handler);
});
}
pub(crate) fn is_registered(name: &str) -> bool {
REGISTRY.with(|r| r.borrow().contains_key(name))
}
#[wasm_bindgen]
pub fn awsm_worker_entry() {
let global = web_sys::js_sys::global();
let worker_scope = match global.dyn_into::<DedicatedWorkerGlobalScope>() {
Ok(s) => s,
Err(_) => {
tracing::warn!("awsm_worker_entry called outside a worker; no-op");
return;
}
};
let worker_scope_clone = worker_scope.clone();
let onmessage = Closure::<dyn FnMut(MessageEvent)>::new(move |e: MessageEvent| {
let data = e.data();
let kind = Reflect::get(&data, &JsValue::from_str("kind"))
.ok()
.and_then(|v| v.as_string())
.unwrap_or_default();
if kind != "awsm-job" {
return;
}
let id = crate::workers::pool::parse_job_id(&data).unwrap_or(0);
let name = Reflect::get(&data, &JsValue::from_str("name"))
.ok()
.and_then(|v| v.as_string())
.unwrap_or_default();
let input = Reflect::get(&data, &JsValue::from_str("input")).unwrap_or(JsValue::UNDEFINED);
let handler_future = REGISTRY.with(|r| {
let r = r.borrow();
r.get(name.as_str()).map(|handler| (handler)(input))
});
let scope = worker_scope_clone.clone();
match handler_future {
Some(fut) => {
spawn_local(async move {
let outcome = fut.await;
let response = Object::new();
let _ = Reflect::set(
&response,
&JsValue::from_str("id"),
&JsValue::from_str(&id.to_string()),
);
let mut transfer_list: Option<web_sys::js_sys::Array> = None;
match outcome {
Ok((payload, transfer)) => {
let _ = Reflect::set(
&response,
&JsValue::from_str("kind"),
&JsValue::from_str("awsm-result"),
);
let _ =
Reflect::set(&response, &JsValue::from_str("payload"), &payload);
if transfer.length() > 0 {
transfer_list = Some(transfer);
}
}
Err(msg) => {
let _ = Reflect::set(
&response,
&JsValue::from_str("kind"),
&JsValue::from_str("awsm-error"),
);
let _ = Reflect::set(
&response,
&JsValue::from_str("message"),
&JsValue::from_str(&msg),
);
}
}
let post_result = match transfer_list {
Some(arr) => scope.post_message_with_transfer(&response, &arr),
None => scope.post_message(&response),
};
if let Err(err) = post_result {
tracing::warn!("worker post_message failed: {err:?}");
}
});
}
None => {
let response = Object::new();
let _ = Reflect::set(
&response,
&JsValue::from_str("kind"),
&JsValue::from_str("awsm-error"),
);
let _ = Reflect::set(
&response,
&JsValue::from_str("id"),
&JsValue::from_str(&id.to_string()),
);
let _ = Reflect::set(
&response,
&JsValue::from_str("message"),
&JsValue::from_str(&format!("unknown job: {name}")),
);
if let Err(err) = scope.post_message(&response) {
tracing::warn!("worker post_message failed: {err:?}");
}
}
}
});
worker_scope.set_onmessage(Some(
onmessage
.as_ref()
.unchecked_ref::<web_sys::js_sys::Function>(),
));
ONMESSAGE_HOLDER.with(|h| {
*h.borrow_mut() = Some(onmessage);
});
}
pub struct EchoJob;
#[derive(Clone, Debug, Serialize, serde::Deserialize)]
pub struct EchoInput {
pub message: String,
}
#[derive(Clone, Debug, Serialize, serde::Deserialize)]
pub struct EchoOutput {
pub message: String,
}
impl WorkerJob for EchoJob {
const NAME: &'static str = "echo";
type Input = EchoInput;
type Output = EchoOutput;
fn execute(input: Self::Input) -> Pin<Box<dyn Future<Output = anyhow::Result<Self::Output>>>> {
Box::pin(async move {
Ok(EchoOutput {
message: format!("echo: {}", input.message),
})
})
}
}
#[allow(dead_code)]
fn _enforce_bounds<T: Serialize + DeserializeOwned>() {}