1#![deny(unsafe_code)]
12#![allow(clippy::missing_errors_doc)]
13#![allow(clippy::needless_pass_by_value)]
15
16pub mod error;
17mod exports;
18pub mod types;
19
20pub use exports::{
21 abi_fingerprint, abi_version, fetch_request, runtime_close, runtime_create, scope_close,
22 scope_enter, task_cancel, task_join, task_spawn, websocket_cancel, websocket_close,
23 websocket_open, websocket_recv, websocket_send,
24};
25
26use crate::error::dispatch_error_json;
27use crate::types::{decode_json_payload, decode_optional_consumer_version, encode_json_payload};
28#[cfg(not(target_arch = "wasm32"))]
29use asupersync::types::WasmDispatcherDiagnostics;
30use asupersync::types::{
31 WASM_ABI_MAJOR_VERSION, WASM_ABI_MINOR_VERSION, WASM_ABI_SIGNATURE_FINGERPRINT_V1,
32 WasmAbiCancellation, WasmAbiErrorCode, WasmAbiFailure, WasmAbiOutcomeEnvelope,
33 WasmAbiRecoverability, WasmAbiValue, WasmAbiVersion, WasmDispatchError, WasmExportDispatcher,
34 WasmFetchRequest, WasmHandleRef, WasmScopeEnterRequest, WasmTaskCancelRequest,
35 WasmTaskSpawnRequest,
36};
37use std::cell::RefCell;
38use std::collections::{HashMap, VecDeque};
39#[cfg(target_arch = "wasm32")]
40use std::rc::Rc;
41#[cfg(target_arch = "wasm32")]
42use wasm_bindgen::closure::Closure;
43#[cfg(target_arch = "wasm32")]
44use wasm_bindgen::{JsCast, JsValue};
45#[cfg(target_arch = "wasm32")]
46use wasm_bindgen_futures::{JsFuture, spawn_local};
47#[cfg(target_arch = "wasm32")]
48use web_sys::{
49 AbortController, BinaryType, CloseEvent, Event, MessageEvent, RequestInit, Response, WebSocket,
50 WorkerGlobalScope,
51};
52
53thread_local! {
54 static DISPATCHER: RefCell<WasmExportDispatcher> = RefCell::new(WasmExportDispatcher::new());
55}
56#[cfg(target_arch = "wasm32")]
57thread_local! {
58 static INFLIGHT_FETCHES: RefCell<HashMap<WasmHandleRef, AbortController>> = RefCell::new(HashMap::new());
59}
60thread_local! {
61 static INFLIGHT_WEBSOCKETS: RefCell<HashMap<WasmHandleRef, BrowserWebSocketHostState>> = RefCell::new(HashMap::new());
62}
63
64#[derive(Debug, Clone, serde::Deserialize)]
65struct BrowserWebSocketOpenRequest {
66 scope: WasmHandleRef,
67 url: String,
68 protocols: Option<Vec<String>>,
69}
70
71#[derive(Debug, Clone, serde::Deserialize)]
72struct BrowserWebSocketSendRequest {
73 socket: WasmHandleRef,
74 value: WasmAbiValue,
75}
76
77#[derive(Debug, Clone, serde::Deserialize)]
78struct BrowserWebSocketRecvRequest {
79 socket: WasmHandleRef,
80}
81
82#[derive(Debug, Clone, serde::Deserialize)]
83struct BrowserWebSocketCloseRequest {
84 socket: WasmHandleRef,
85 reason: Option<String>,
86}
87
88#[derive(Debug, Clone, serde::Deserialize)]
89struct BrowserWebSocketCancelRequest {
90 socket: WasmHandleRef,
91 kind: String,
92 message: Option<String>,
93}
94
95#[cfg(target_arch = "wasm32")]
96struct BrowserWebSocketHostState {
97 socket: WebSocket,
98 inbox: Rc<RefCell<VecDeque<WasmAbiOutcomeEnvelope>>>,
99 _on_message: Closure<dyn FnMut(MessageEvent)>,
100 _on_close: Closure<dyn FnMut(CloseEvent)>,
101 _on_error: Closure<dyn FnMut(Event)>,
102}
103
104#[cfg(target_arch = "wasm32")]
105impl Drop for BrowserWebSocketHostState {
106 fn drop(&mut self) {
107 self.socket.set_onmessage(None);
108 self.socket.set_onclose(None);
109 self.socket.set_onerror(None);
110 let _ = self.socket.close();
111 }
112}
113
114#[cfg(not(target_arch = "wasm32"))]
115struct BrowserWebSocketHostState {
116 inbox: VecDeque<WasmAbiOutcomeEnvelope>,
117 closed: bool,
118}
119
120fn parse_json<T: serde::de::DeserializeOwned>(raw: &str, field: &str) -> Result<T, String> {
121 decode_json_payload(raw, field)
122}
123
124fn encode_json<T: serde::Serialize>(value: &T, field: &str) -> Result<String, String> {
125 encode_json_payload(value, field)
126}
127
128fn parse_consumer_version(raw: Option<String>) -> Result<Option<WasmAbiVersion>, String> {
129 decode_optional_consumer_version(raw)
130}
131
132fn to_error_string(err: WasmDispatchError) -> String {
133 dispatch_error_json(&err)
134}
135
136fn with_dispatcher<R>(
137 f: impl FnOnce(&mut WasmExportDispatcher) -> Result<R, WasmDispatchError>,
138) -> Result<R, String> {
139 DISPATCHER.with(|dispatcher| {
140 let mut dispatcher = dispatcher.borrow_mut();
141 f(&mut dispatcher).map_err(to_error_string)
142 })
143}
144
145fn dispatcher_handle_is_live(handle: &WasmHandleRef) -> bool {
146 DISPATCHER.with(|dispatcher| dispatcher.borrow().handles().get(handle).is_ok())
147}
148
149#[cfg(target_arch = "wasm32")]
150fn cleanup_released_fetches() {
151 INFLIGHT_FETCHES.with(|inflight| {
152 inflight
153 .borrow_mut()
154 .retain(|handle, _| dispatcher_handle_is_live(handle));
155 });
156}
157
158#[cfg(not(target_arch = "wasm32"))]
159const fn cleanup_released_fetches() {}
160
161fn cleanup_released_websockets() {
162 INFLIGHT_WEBSOCKETS.with(|sockets| {
163 sockets
164 .borrow_mut()
165 .retain(|handle, _| dispatcher_handle_is_live(handle));
166 });
167}
168
169fn cleanup_released_host_state() {
170 cleanup_released_fetches();
171 cleanup_released_websockets();
172}
173
174fn normalize_fetch_method(method: &str) -> Result<String, String> {
175 let normalized = method.trim().to_ascii_uppercase();
176 if normalized.is_empty() {
177 return Err("fetch method must not be empty".to_string());
178 }
179 match normalized.as_str() {
180 "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | "HEAD" | "OPTIONS" => Ok(normalized),
181 _ => Err(format!("unsupported fetch method: {normalized}")),
182 }
183}
184
185fn normalize_fetch_request(request: WasmFetchRequest) -> Result<WasmFetchRequest, String> {
186 let method = normalize_fetch_method(&request.method)?;
187 if matches!(method.as_str(), "GET" | "HEAD") && request.body.is_some() {
188 return Err(format!(
189 "fetch method {method} does not permit a request body"
190 ));
191 }
192 Ok(WasmFetchRequest { method, ..request })
193}
194
195const fn fetch_pending_outcome(handle: WasmHandleRef) -> WasmAbiOutcomeEnvelope {
196 WasmAbiOutcomeEnvelope::Ok {
197 value: WasmAbiValue::Handle(handle),
198 }
199}
200
201#[allow(clippy::missing_const_for_fn)]
202fn fetch_error_outcome(
203 message: String,
204 recoverability: WasmAbiRecoverability,
205) -> WasmAbiOutcomeEnvelope {
206 WasmAbiOutcomeEnvelope::Err {
207 failure: WasmAbiFailure {
208 code: WasmAbiErrorCode::InternalFailure,
209 recoverability,
210 message,
211 },
212 }
213}
214
215fn cancelled_outcome(
216 kind: &str,
217 phase: &str,
218 message: Option<String>,
219 origin_task: Option<String>,
220) -> WasmAbiOutcomeEnvelope {
221 WasmAbiOutcomeEnvelope::Cancelled {
222 cancellation: WasmAbiCancellation {
223 kind: kind.to_string(),
224 phase: phase.to_string(),
225 origin_region: "browser".to_string(),
226 origin_task,
227 timestamp_nanos: 0,
228 message,
229 truncated: false,
230 },
231 }
232}
233
234#[cfg(target_arch = "wasm32")]
235fn take_inflight_fetch(handle: &WasmHandleRef) -> Option<AbortController> {
236 INFLIGHT_FETCHES.with(|inflight| inflight.borrow_mut().remove(handle))
237}
238
239#[cfg(target_arch = "wasm32")]
240fn register_inflight_fetch(handle: WasmHandleRef, controller: AbortController) {
241 INFLIGHT_FETCHES.with(|inflight| {
242 inflight.borrow_mut().insert(handle, controller);
243 });
244}
245
246#[cfg(target_arch = "wasm32")]
247fn js_value_message(value: &JsValue) -> String {
248 value
249 .as_string()
250 .or_else(|| {
251 js_sys::JSON::stringify(value)
252 .ok()
253 .and_then(|json| json.as_string())
254 })
255 .unwrap_or_else(|| "non-string JS error".to_string())
256}
257
258#[cfg(target_arch = "wasm32")]
259fn js_error_name(value: &JsValue) -> Option<String> {
260 js_sys::Reflect::get(value, &JsValue::from_str("name"))
261 .ok()
262 .and_then(|name| name.as_string())
263}
264
265#[cfg(target_arch = "wasm32")]
266fn abort_cancelled_outcome(message: String) -> WasmAbiOutcomeEnvelope {
267 cancelled_outcome("abort_signal", "cancelling", Some(message), None)
268}
269
270fn normalize_websocket_url(url: &str) -> Result<String, String> {
271 let normalized = url.trim();
272 if normalized.is_empty() {
273 return Err("websocket URL must not be empty".to_string());
274 }
275 let (scheme, rest) = normalized
276 .split_once("://")
277 .ok_or_else(|| format!("websocket URL must start with ws:// or wss://: {normalized}"))?;
278 if !(scheme.eq_ignore_ascii_case("ws") || scheme.eq_ignore_ascii_case("wss")) {
279 return Err(format!(
280 "websocket URL must start with ws:// or wss://: {normalized}"
281 ));
282 }
283 Ok(format!("{}://{rest}", scheme.to_ascii_lowercase()))
284}
285
286const fn websocket_pending_outcome(handle: WasmHandleRef) -> WasmAbiOutcomeEnvelope {
287 WasmAbiOutcomeEnvelope::Ok {
288 value: WasmAbiValue::Handle(handle),
289 }
290}
291
292const fn websocket_idle_outcome() -> WasmAbiOutcomeEnvelope {
293 WasmAbiOutcomeEnvelope::Ok {
294 value: WasmAbiValue::Unit,
295 }
296}
297
298const fn websocket_send_outcome() -> WasmAbiOutcomeEnvelope {
299 WasmAbiOutcomeEnvelope::Ok {
300 value: WasmAbiValue::Unit,
301 }
302}
303
304fn spawn_websocket_handle(
305 scope: WasmHandleRef,
306 consumer_version: Option<WasmAbiVersion>,
307) -> Result<WasmHandleRef, String> {
308 let spawn = WasmTaskSpawnRequest {
309 scope,
310 label: Some("browser-websocket".to_string()),
311 cancel_kind: Some("abort_signal".to_string()),
312 };
313 with_dispatcher(|dispatcher| dispatcher.task_spawn(&spawn, consumer_version))
314}
315
316fn finalize_websocket_handle(
317 handle: &WasmHandleRef,
318 outcome: WasmAbiOutcomeEnvelope,
319 consumer_version: Option<WasmAbiVersion>,
320) -> Result<WasmAbiOutcomeEnvelope, String> {
321 with_dispatcher(|dispatcher| dispatcher.task_join(handle, outcome, consumer_version))
322}
323
324fn cancel_websocket_handle(
325 request: &WasmTaskCancelRequest,
326 consumer_version: Option<WasmAbiVersion>,
327) -> Result<WasmAbiOutcomeEnvelope, String> {
328 with_dispatcher(|dispatcher| dispatcher.task_cancel(request, consumer_version))
329}
330
331fn with_websocket_state_mut<R>(
332 handle: &WasmHandleRef,
333 f: impl FnOnce(&mut BrowserWebSocketHostState) -> Result<R, String>,
334) -> Result<R, String> {
335 INFLIGHT_WEBSOCKETS.with(|sockets| {
336 let mut sockets = sockets.borrow_mut();
337 let state = sockets
338 .get_mut(handle)
339 .ok_or_else(|| format!("unknown websocket handle: {handle:?}"))?;
340 f(state)
341 })
342}
343
344fn take_websocket_state(handle: &WasmHandleRef) -> Option<BrowserWebSocketHostState> {
345 INFLIGHT_WEBSOCKETS.with(|sockets| sockets.borrow_mut().remove(handle))
346}
347
348fn insert_websocket_state(handle: WasmHandleRef, state: BrowserWebSocketHostState) {
349 INFLIGHT_WEBSOCKETS.with(|sockets| {
350 sockets.borrow_mut().insert(handle, state);
351 });
352}
353
354#[cfg(target_arch = "wasm32")]
355fn finalize_fetch_outcome(handle: WasmHandleRef, outcome: WasmAbiOutcomeEnvelope) {
356 if take_inflight_fetch(&handle).is_none() {
357 return;
358 }
359 if matches!(outcome, WasmAbiOutcomeEnvelope::Cancelled { .. }) {
360 let _ = with_dispatcher(|dispatcher| dispatcher.apply_abort(&handle));
361 }
362 let _ = with_dispatcher(|dispatcher| dispatcher.fetch_complete(&handle, outcome));
363}
364
365#[cfg(target_arch = "wasm32")]
366fn host_fetch_with_str_and_init(url: &str, init: &RequestInit) -> Result<js_sys::Promise, String> {
367 if let Some(window) = web_sys::window() {
368 return Ok(window.fetch_with_str_and_init(url, init));
369 }
370
371 if let Ok(worker) = js_sys::global().dyn_into::<WorkerGlobalScope>() {
372 return Ok(worker.fetch_with_str_and_init(url, init));
373 }
374
375 Err("window or WorkerGlobalScope fetch host is not available in this host context".to_string())
376}
377
378#[cfg(target_arch = "wasm32")]
379async fn run_browser_fetch(
380 request: WasmFetchRequest,
381 signal: web_sys::AbortSignal,
382) -> WasmAbiOutcomeEnvelope {
383 let init = RequestInit::new();
384 init.set_method(&request.method);
385 init.set_signal(Some(&signal));
386 if let Some(body) = request.body {
387 let body = js_sys::Uint8Array::from(body.as_slice());
388 init.set_body(&body.into());
389 }
390
391 let fetch_promise = match host_fetch_with_str_and_init(&request.url, &init) {
392 Ok(fetch_promise) => fetch_promise,
393 Err(message) => {
394 return fetch_error_outcome(message, WasmAbiRecoverability::Permanent);
395 }
396 };
397 match JsFuture::from(fetch_promise).await {
398 Ok(response_value) => {
399 let status = response_value
400 .dyn_into::<Response>()
401 .ok()
402 .map(|response| u64::from(response.status()));
403 let value = status.map_or(WasmAbiValue::Unit, WasmAbiValue::U64);
404 WasmAbiOutcomeEnvelope::Ok { value }
405 }
406 Err(error) => {
407 let message = js_value_message(&error);
408 if js_error_name(&error).as_deref() == Some("AbortError") {
409 abort_cancelled_outcome(format!("fetch aborted by AbortSignal: {message}"))
410 } else {
411 fetch_error_outcome(
412 format!("browser fetch rejected: {message}"),
413 WasmAbiRecoverability::Transient,
414 )
415 }
416 }
417 }
418}
419
420#[cfg(target_arch = "wasm32")]
421fn spawn_browser_fetch(handle: WasmHandleRef, request: WasmFetchRequest) -> Result<(), String> {
422 let controller = AbortController::new().map_err(|err| {
423 format!(
424 "failed to create AbortController for fetch handle {:?}: {}",
425 handle,
426 js_value_message(&err)
427 )
428 })?;
429 let signal = controller.signal();
430 register_inflight_fetch(handle, controller);
431 spawn_local(async move {
432 let outcome = run_browser_fetch(request, signal).await;
433 finalize_fetch_outcome(handle, outcome);
434 });
435 Ok(())
436}
437
438#[cfg(target_arch = "wasm32")]
439fn websocket_outcome_from_message_event(event: MessageEvent) -> WasmAbiOutcomeEnvelope {
440 let payload = event.data();
441 if let Some(text) = payload.as_string() {
442 return WasmAbiOutcomeEnvelope::Ok {
443 value: WasmAbiValue::String(text),
444 };
445 }
446 if let Ok(buffer) = payload.dyn_into::<js_sys::ArrayBuffer>() {
447 let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
448 return WasmAbiOutcomeEnvelope::Ok {
449 value: WasmAbiValue::Bytes(bytes),
450 };
451 }
452 fetch_error_outcome(
453 "websocket message payload type is unsupported".to_string(),
454 WasmAbiRecoverability::Unknown,
455 )
456}
457
458#[cfg(target_arch = "wasm32")]
459fn setup_browser_websocket(
460 handle: WasmHandleRef,
461 request: &BrowserWebSocketOpenRequest,
462) -> Result<(), String> {
463 let socket = if let Some(protocols) = request.protocols.as_ref() {
464 if protocols.is_empty() {
465 WebSocket::new(&request.url)
466 } else {
467 let js_protocols = js_sys::Array::new();
468 for protocol in protocols {
469 js_protocols.push(&JsValue::from_str(protocol));
470 }
471 WebSocket::new_with_str_sequence(&request.url, &js_protocols)
472 }
473 } else {
474 WebSocket::new(&request.url)
475 }
476 .map_err(|err| {
477 format!(
478 "failed to construct browser WebSocket: {}",
479 js_value_message(&err)
480 )
481 })?;
482 socket.set_binary_type(BinaryType::Arraybuffer);
483
484 let inbox = Rc::new(RefCell::new(VecDeque::new()));
485 let inbox_for_message = Rc::clone(&inbox);
486 let on_message = Closure::wrap(Box::new(move |event: MessageEvent| {
487 inbox_for_message
488 .borrow_mut()
489 .push_back(websocket_outcome_from_message_event(event));
490 }) as Box<dyn FnMut(MessageEvent)>);
491
492 let inbox_for_close = Rc::clone(&inbox);
493 let on_close = Closure::wrap(Box::new(move |event: CloseEvent| {
494 let message = if event.reason().is_empty() {
495 format!("websocket closed with code {}", event.code())
496 } else {
497 format!(
498 "websocket closed with code {} ({})",
499 event.code(),
500 event.reason()
501 )
502 };
503 inbox_for_close.borrow_mut().push_back(cancelled_outcome(
504 "websocket_close",
505 "completed",
506 Some(message),
507 None,
508 ));
509 }) as Box<dyn FnMut(CloseEvent)>);
510
511 let inbox_for_error = Rc::clone(&inbox);
512 let on_error = Closure::wrap(Box::new(move |_event: Event| {
513 inbox_for_error.borrow_mut().push_back(fetch_error_outcome(
514 "browser websocket error event".to_string(),
515 WasmAbiRecoverability::Transient,
516 ));
517 }) as Box<dyn FnMut(Event)>);
518
519 socket.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
520 socket.set_onclose(Some(on_close.as_ref().unchecked_ref()));
521 socket.set_onerror(Some(on_error.as_ref().unchecked_ref()));
522
523 INFLIGHT_WEBSOCKETS.with(|sockets| {
524 sockets.borrow_mut().insert(
525 handle,
526 BrowserWebSocketHostState {
527 socket,
528 inbox,
529 _on_message: on_message,
530 _on_close: on_close,
531 _on_error: on_error,
532 },
533 );
534 });
535
536 Ok(())
537}
538
539#[cfg(not(target_arch = "wasm32"))]
540#[allow(clippy::unnecessary_wraps)]
541fn setup_browser_websocket(
542 handle: WasmHandleRef,
543 request: &BrowserWebSocketOpenRequest,
544) -> Result<(), String> {
545 let _requested_protocols = request.protocols.as_ref().map(std::vec::Vec::len);
546 INFLIGHT_WEBSOCKETS.with(|sockets| {
547 sockets.borrow_mut().insert(
548 handle,
549 BrowserWebSocketHostState {
550 inbox: VecDeque::new(),
551 closed: false,
552 },
553 );
554 });
555 Ok(())
556}
557
558#[cfg(target_arch = "wasm32")]
559fn send_browser_websocket_message(
560 handle: &WasmHandleRef,
561 value: WasmAbiValue,
562) -> Result<(), String> {
563 with_websocket_state_mut(handle, |state| match value {
564 WasmAbiValue::String(text) => state.socket.send_with_str(&text).map_err(|err| {
565 format!(
566 "websocket send_with_str failed for {:?}: {}",
567 handle,
568 js_value_message(&err)
569 )
570 }),
571 WasmAbiValue::Bytes(bytes) => state.socket.send_with_u8_array(&bytes).map_err(|err| {
572 format!(
573 "websocket send_with_u8_array failed for {:?}: {}",
574 handle,
575 js_value_message(&err)
576 )
577 }),
578 other => Err(format!(
579 "websocket send requires string/bytes payload, got {other:?}"
580 )),
581 })
582}
583
584#[cfg(not(target_arch = "wasm32"))]
585fn send_browser_websocket_message(
586 handle: &WasmHandleRef,
587 value: WasmAbiValue,
588) -> Result<(), String> {
589 with_websocket_state_mut(handle, |state| {
590 if state.closed {
591 return Err(format!("websocket handle {handle:?} is already closed"));
592 }
593 match value {
594 WasmAbiValue::String(text) => state.inbox.push_back(WasmAbiOutcomeEnvelope::Ok {
595 value: WasmAbiValue::String(text),
596 }),
597 WasmAbiValue::Bytes(bytes) => state.inbox.push_back(WasmAbiOutcomeEnvelope::Ok {
598 value: WasmAbiValue::Bytes(bytes),
599 }),
600 other => {
601 return Err(format!(
602 "websocket send requires string/bytes payload, got {other:?}"
603 ));
604 }
605 }
606 Ok(())
607 })
608}
609
610#[cfg(target_arch = "wasm32")]
611fn recv_browser_websocket_message(
612 handle: &WasmHandleRef,
613) -> Result<WasmAbiOutcomeEnvelope, String> {
614 with_websocket_state_mut(handle, |state| {
615 Ok(state
616 .inbox
617 .borrow_mut()
618 .pop_front()
619 .unwrap_or_else(websocket_idle_outcome))
620 })
621}
622
623#[cfg(not(target_arch = "wasm32"))]
624fn recv_browser_websocket_message(
625 handle: &WasmHandleRef,
626) -> Result<WasmAbiOutcomeEnvelope, String> {
627 with_websocket_state_mut(handle, |state| {
628 Ok(state
629 .inbox
630 .pop_front()
631 .unwrap_or_else(websocket_idle_outcome))
632 })
633}
634
635const MAX_WEBSOCKET_CLOSE_REASON_BYTES: usize = 123;
636
637fn validate_websocket_close_reason(reason: &str) -> Result<(), String> {
638 if reason.len() > MAX_WEBSOCKET_CLOSE_REASON_BYTES {
639 return Err(format!(
640 "websocket close reason exceeds {MAX_WEBSOCKET_CLOSE_REASON_BYTES} bytes"
641 ));
642 }
643 Ok(())
644}
645
646#[cfg(target_arch = "wasm32")]
647fn close_browser_websocket_socket(
648 state: &mut BrowserWebSocketHostState,
649 reason: Option<&str>,
650) -> Result<(), String> {
651 if let Some(reason) = reason {
652 validate_websocket_close_reason(reason)?;
653 state
654 .socket
655 .close_with_code_and_reason(1000, reason)
656 .map_err(|err| format!("websocket close failed: {}", js_value_message(&err)))?;
657 } else {
658 state
659 .socket
660 .close()
661 .map_err(|err| format!("websocket close failed: {}", js_value_message(&err)))?;
662 }
663 Ok(())
664}
665
666#[cfg(not(target_arch = "wasm32"))]
667#[allow(clippy::unnecessary_wraps)]
668fn close_browser_websocket_socket(
669 state: &mut BrowserWebSocketHostState,
670 reason: Option<&str>,
671) -> Result<(), String> {
672 if let Some(reason) = reason {
673 validate_websocket_close_reason(reason)?;
674 state.inbox.push_back(cancelled_outcome(
675 "websocket_close",
676 "completed",
677 Some(reason.to_string()),
678 None,
679 ));
680 }
681 state.closed = true;
682 Ok(())
683}
684
685#[cfg(not(target_arch = "wasm32"))]
687pub fn reset_dispatcher_for_tests() {
688 DISPATCHER.with(|dispatcher| {
689 *dispatcher.borrow_mut() = WasmExportDispatcher::new();
690 });
691 INFLIGHT_WEBSOCKETS.with(|sockets| {
692 sockets.borrow_mut().clear();
693 });
694}
695
696#[cfg(not(target_arch = "wasm32"))]
698#[must_use]
699pub fn dispatcher_diagnostics_for_tests() -> WasmDispatcherDiagnostics {
700 DISPATCHER.with(|dispatcher| dispatcher.borrow().diagnostic_snapshot())
701}
702
703fn runtime_create_impl(consumer_version_json: Option<String>) -> Result<String, String> {
704 let consumer_version = parse_consumer_version(consumer_version_json)?;
705 let handle = with_dispatcher(|dispatcher| dispatcher.runtime_create(consumer_version))?;
706 encode_json(&handle, "runtime_create.response")
707}
708
709fn runtime_close_impl(
710 handle_json: String,
711 consumer_version_json: Option<String>,
712) -> Result<String, String> {
713 let handle: WasmHandleRef = parse_json(&handle_json, "runtime_close.request")?;
714 let consumer_version = parse_consumer_version(consumer_version_json)?;
715 let outcome =
716 with_dispatcher(|dispatcher| dispatcher.runtime_close(&handle, consumer_version))?;
717 cleanup_released_host_state();
718 encode_json(&outcome, "runtime_close.response")
719}
720
721fn scope_enter_impl(
722 request_json: String,
723 consumer_version_json: Option<String>,
724) -> Result<String, String> {
725 let request: WasmScopeEnterRequest = parse_json(&request_json, "scope_enter.request")?;
726 let consumer_version = parse_consumer_version(consumer_version_json)?;
727 let handle = with_dispatcher(|dispatcher| dispatcher.scope_enter(&request, consumer_version))?;
728 encode_json(&handle, "scope_enter.response")
729}
730
731fn scope_close_impl(
732 handle_json: String,
733 consumer_version_json: Option<String>,
734) -> Result<String, String> {
735 let handle: WasmHandleRef = parse_json(&handle_json, "scope_close.request")?;
736 let consumer_version = parse_consumer_version(consumer_version_json)?;
737 let outcome = with_dispatcher(|dispatcher| dispatcher.scope_close(&handle, consumer_version))?;
738 cleanup_released_host_state();
739 encode_json(&outcome, "scope_close.response")
740}
741
742fn task_spawn_impl(
743 request_json: String,
744 consumer_version_json: Option<String>,
745) -> Result<String, String> {
746 let request: WasmTaskSpawnRequest = parse_json(&request_json, "task_spawn.request")?;
747 let consumer_version = parse_consumer_version(consumer_version_json)?;
748 let handle = with_dispatcher(|dispatcher| dispatcher.task_spawn(&request, consumer_version))?;
749 encode_json(&handle, "task_spawn.response")
750}
751
752fn task_join_impl(
753 handle_json: String,
754 outcome_json: String,
755 consumer_version_json: Option<String>,
756) -> Result<String, String> {
757 let handle: WasmHandleRef = parse_json(&handle_json, "task_join.request.handle")?;
758 let outcome: WasmAbiOutcomeEnvelope = parse_json(&outcome_json, "task_join.request.outcome")?;
759 let consumer_version = parse_consumer_version(consumer_version_json)?;
760 let joined =
761 with_dispatcher(|dispatcher| dispatcher.task_join(&handle, outcome, consumer_version))?;
762 encode_json(&joined, "task_join.response")
763}
764
765fn task_cancel_impl(
766 request_json: String,
767 consumer_version_json: Option<String>,
768) -> Result<String, String> {
769 let request: WasmTaskCancelRequest = parse_json(&request_json, "task_cancel.request")?;
770 let consumer_version = parse_consumer_version(consumer_version_json)?;
771 let outcome = with_dispatcher(|dispatcher| dispatcher.task_cancel(&request, consumer_version))?;
772
773 #[cfg(target_arch = "wasm32")]
774 if let Some(controller) = take_inflight_fetch(&request.task) {
775 controller.abort();
776 }
777
778 encode_json(&outcome, "task_cancel.response")
779}
780
781fn fetch_request_impl(
782 request_json: String,
783 consumer_version_json: Option<String>,
784) -> Result<String, String> {
785 let request: WasmFetchRequest = parse_json(&request_json, "fetch_request.request")?;
786 let request = normalize_fetch_request(request)?;
787 let consumer_version = parse_consumer_version(consumer_version_json)?;
788 let handle =
789 with_dispatcher(|dispatcher| dispatcher.fetch_request(&request, consumer_version))?;
790 #[cfg(target_arch = "wasm32")]
791 if let Err(setup_err) = spawn_browser_fetch(handle, request.clone()) {
792 let setup_outcome = fetch_error_outcome(
793 format!("failed to start browser fetch: {setup_err}"),
794 WasmAbiRecoverability::Permanent,
795 );
796 let _ =
797 with_dispatcher(|dispatcher| dispatcher.fetch_complete(&handle, setup_outcome.clone()));
798 return encode_json(&setup_outcome, "fetch_request.response");
799 }
800 encode_json(&fetch_pending_outcome(handle), "fetch_request.response")
801}
802
803fn websocket_open_impl(
804 request_json: String,
805 consumer_version_json: Option<String>,
806) -> Result<String, String> {
807 let request: BrowserWebSocketOpenRequest = parse_json(&request_json, "websocket_open.request")?;
808 let url = normalize_websocket_url(&request.url)?;
809 let request = BrowserWebSocketOpenRequest { url, ..request };
810 let consumer_version = parse_consumer_version(consumer_version_json)?;
811 let handle = spawn_websocket_handle(request.scope, consumer_version)?;
812 if let Err(setup_err) = setup_browser_websocket(handle, &request) {
813 let setup_outcome = fetch_error_outcome(
814 format!("failed to start browser websocket: {setup_err}"),
815 WasmAbiRecoverability::Permanent,
816 );
817 let _ = finalize_websocket_handle(&handle, setup_outcome.clone(), consumer_version);
818 return encode_json(&setup_outcome, "websocket_open.response");
819 }
820 encode_json(
821 &websocket_pending_outcome(handle),
822 "websocket_open.response",
823 )
824}
825
826fn websocket_send_impl(
827 request_json: String,
828 _consumer_version_json: Option<String>,
829) -> Result<String, String> {
830 let request: BrowserWebSocketSendRequest = parse_json(&request_json, "websocket_send.request")?;
831 send_browser_websocket_message(&request.socket, request.value)?;
832 encode_json(&websocket_send_outcome(), "websocket_send.response")
833}
834
835fn websocket_recv_impl(
836 request_json: String,
837 _consumer_version_json: Option<String>,
838) -> Result<String, String> {
839 let request: BrowserWebSocketRecvRequest = parse_json(&request_json, "websocket_recv.request")?;
840 let outcome = recv_browser_websocket_message(&request.socket)?;
841 encode_json(&outcome, "websocket_recv.response")
842}
843
844fn websocket_close_impl(
845 request_json: String,
846 consumer_version_json: Option<String>,
847) -> Result<String, String> {
848 let request: BrowserWebSocketCloseRequest =
849 parse_json(&request_json, "websocket_close.request")?;
850 let consumer_version = parse_consumer_version(consumer_version_json)?;
851 let close_reason = request.reason.clone();
852 let mut state = take_websocket_state(&request.socket)
853 .ok_or_else(|| format!("unknown websocket handle: {:?}", request.socket))?;
854 if let Err(err) = close_browser_websocket_socket(&mut state, close_reason.as_deref()) {
855 insert_websocket_state(request.socket, state);
856 return Err(err);
857 }
858 let outcome = if let Some(reason) = close_reason {
859 cancelled_outcome(
860 "websocket_close",
861 "completed",
862 Some(reason),
863 Some(format!("{:?}", request.socket)),
864 )
865 } else {
866 websocket_send_outcome()
867 };
868 let closed = finalize_websocket_handle(&request.socket, outcome, consumer_version)?;
869 encode_json(&closed, "websocket_close.response")
870}
871
872fn websocket_cancel_impl(
873 request_json: String,
874 consumer_version_json: Option<String>,
875) -> Result<String, String> {
876 let request: BrowserWebSocketCancelRequest =
877 parse_json(&request_json, "websocket_cancel.request")?;
878 let consumer_version = parse_consumer_version(consumer_version_json)?;
879 let cancel_message = request.message.clone();
880 let cancel = WasmTaskCancelRequest {
881 task: request.socket,
882 kind: request.kind.clone(),
883 message: cancel_message.clone(),
884 };
885 let _ = cancel_websocket_handle(&cancel, consumer_version)?;
886 if let Some(mut state) = take_websocket_state(&request.socket)
887 && let Err(err) = close_browser_websocket_socket(&mut state, cancel_message.as_deref())
888 {
889 insert_websocket_state(request.socket, state);
890 return Err(err);
891 }
892 let cancelled = cancelled_outcome(
893 &request.kind,
894 "cancelling",
895 request.message,
896 Some(format!("{:?}", request.socket)),
897 );
898 let joined = finalize_websocket_handle(&request.socket, cancelled, consumer_version)?;
899 encode_json(&joined, "websocket_cancel.response")
900}
901
902fn abi_version_impl() -> Result<String, String> {
903 let version = WasmAbiVersion {
904 major: WASM_ABI_MAJOR_VERSION,
905 minor: WASM_ABI_MINOR_VERSION,
906 };
907 encode_json(&version, "abi_version.response")
908}
909
910const fn abi_fingerprint_impl() -> u64 {
911 WASM_ABI_SIGNATURE_FINGERPRINT_V1
912}
913
914#[cfg(target_arch = "wasm32")]
915fn into_js_error(err: String) -> JsValue {
916 JsValue::from_str(&err)
917}