#![cfg(target_arch = "wasm32")]
#![allow(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use js_sys::Uint8Array;
use tokio::sync::{mpsc, oneshot, Mutex};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::{BinaryType, CloseEvent, ErrorEvent, MessageEvent, WebSocket};
use crate::api::session::SessionTransport;
use crate::errors::CoreError;
const SEND_QUEUE_CAPACITY: usize = 256;
pub struct WebSocketLeg {
send_tx: mpsc::Sender<Vec<u8>>,
recv_rx: Mutex<mpsc::UnboundedReceiver<Vec<u8>>>,
closed: Arc<AtomicBool>,
}
impl WebSocketLeg {
pub async fn connect(url: &str) -> Result<Self, CoreError> {
let ws = WebSocket::new(url)
.map_err(|e| CoreError::NetworkError(format!("WebSocket new: {:?}", e)))?;
ws.set_binary_type(BinaryType::Arraybuffer);
let (send_tx, mut send_rx) = mpsc::channel::<Vec<u8>>(SEND_QUEUE_CAPACITY);
let (recv_tx, recv_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let closed = Arc::new(AtomicBool::new(false));
let (open_tx, open_rx) = oneshot::channel::<Result<(), CoreError>>();
let open_tx = Arc::new(Mutex::new(Some(open_tx)));
{
let open_tx = open_tx.clone();
let onopen = Closure::wrap(Box::new(move |_e: web_sys::Event| {
let open_tx = open_tx.clone();
wasm_bindgen_futures::spawn_local(async move {
if let Some(tx) = open_tx.lock().await.take() {
let _ = tx.send(Ok(()));
}
});
}) as Box<dyn FnMut(web_sys::Event)>);
ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
onopen.forget();
}
let recv_tx_clone = recv_tx.clone();
let closed_clone = closed.clone();
{
let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
if closed_clone.load(Ordering::Acquire) {
return;
}
if let Ok(buf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = Uint8Array::new(&buf);
let mut bytes = vec![0u8; array.length() as usize];
array.copy_to(&mut bytes);
let _ = recv_tx_clone.send(bytes);
}
}) as Box<dyn FnMut(MessageEvent)>);
ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget();
}
{
let open_tx = open_tx.clone();
let closed = closed.clone();
let onerror = Closure::wrap(Box::new(move |e: ErrorEvent| {
closed.store(true, Ordering::Release);
let open_tx = open_tx.clone();
let msg = format!("WebSocket error: {}", e.message());
wasm_bindgen_futures::spawn_local(async move {
if let Some(tx) = open_tx.lock().await.take() {
let _ = tx.send(Err(CoreError::NetworkError(msg)));
}
});
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onerror.forget();
}
{
let open_tx = open_tx.clone();
let closed = closed.clone();
let onclose = Closure::wrap(Box::new(move |e: CloseEvent| {
closed.store(true, Ordering::Release);
let open_tx = open_tx.clone();
let msg = format!("WebSocket close: code={} reason={}", e.code(), e.reason());
wasm_bindgen_futures::spawn_local(async move {
if let Some(tx) = open_tx.lock().await.take() {
let _ = tx.send(Err(CoreError::NetworkError(msg)));
}
});
}) as Box<dyn FnMut(CloseEvent)>);
ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
onclose.forget();
}
let closed_send = closed.clone();
wasm_bindgen_futures::spawn_local(async move {
while let Some(bytes) = send_rx.recv().await {
if closed_send.load(Ordering::Acquire) {
break;
}
if let Err(e) = ws.send_with_u8_array(&bytes) {
web_sys::console::warn_1(&format!("WebSocketLeg send failed: {:?}", e).into());
closed_send.store(true, Ordering::Release);
break;
}
}
let _ = ws.close();
});
open_rx
.await
.map_err(|_| CoreError::NetworkError("WebSocket open signal lost".into()))??;
Ok(Self {
send_tx,
recv_rx: Mutex::new(recv_rx),
closed,
})
}
pub fn close(&self) {
self.closed.store(true, Ordering::Release);
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
}
impl SessionTransport for WebSocketLeg {
async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
if self.closed.load(Ordering::Acquire) {
return Err(CoreError::ConnectionClosed);
}
self.send_tx
.send(data.to_vec())
.await
.map_err(|_| CoreError::NetworkError("WebSocket send queue closed".into()))
}
async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
let mut rx = self.recv_rx.lock().await;
match rx.recv().await {
Some(bytes) => Ok(Bytes::from(bytes)),
None => Err(CoreError::ConnectionClosed),
}
}
}