cross-ws 0.3.3

cross-ws is a web and native stream based WebSocket client
Documentation
//! web-sys WebSocket backend.

use crate::Result;

mod message;
mod sender;
mod receiver;

pub use sender::*;
pub use receiver::*;

use web_sys::{MessageEvent, Event, CloseEvent};
use wasm_bindgen::{JsValue, JsCast};
use wasm_bindgen::closure::Closure;
use js_sys::Function;
use wasm_bindgen_futures::{JsFuture, spawn_local};
use crate::traits::*;
use std::future::Future;
use std::convert::TryFrom;


/// Stream-based WebSocket.
#[derive(Debug)]
pub struct WebSocket {}

impl WebSocketTrait for WebSocket {
    type Sender = WebSocketSender;
    type Receiver = WebSocketReceiver;

    fn new(url: &str) -> impl Future<Output = Result<(WebSocketSender, WebSocketReceiver)>> {
        async move {
            let (sender, receiver) = async_channel::unbounded();

            let mut connection_callback = Box::new(|accept: Function, reject: Function| {
                // Connection
                let websocket = match web_sys::WebSocket::new(url) {
                    Ok(ws) => ws,
                    Err(err) => {
                        let error_msg = JsValue::from_str(&format!("Failed to create WebSocket: {:?}", err));
                        reject.call1(&JsValue::NULL, &error_msg).ok();
                        return;
                    }
                };
                {
                    let js_value = match websocket.clone().dyn_into::<JsValue>() {
                        Ok(js_val) => js_val,
                        Err(err) => {
                            let error_msg = JsValue::from_str(&format!("Failed to cast WebSocket to JsValue: {:?}", err));
                            reject.call1(&JsValue::NULL, &error_msg).ok();
                            return;
                        }
                    };
                    let onopen_callback = Closure::wrap(Box::new(move |_event| {
                        accept.call1(&JsValue::NULL, &js_value).ok();
                    }) as Box<dyn FnMut(Event)>);
                    websocket.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
                    onopen_callback.forget();
                }

                // Error handling.
                let onerror_callback = Closure::wrap(Box::new(move |_event: Event| {
                    let error_msg = JsValue::from_str("WebSocket connection error occurred");
                    reject.call1(&JsValue::NULL, &error_msg).ok();
                }) as Box<dyn FnMut(Event)>);
                websocket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
                onerror_callback.forget();
            }) as Box<dyn FnMut(Function, Function)>;
            let connection_promise = js_sys::Promise::new(&mut connection_callback);

            let websocket_js = JsFuture::from(connection_promise).await
                .map_err(|error| crate::error::Error::ConnectionError(error))?;
            
            let websocket: web_sys::WebSocket = websocket_js.dyn_into()
                .map_err(|err| {
                    let error_msg = format!("Failed to cast JsValue to WebSocket: {:?}", err);
                    crate::error::Error::CastError(error_msg)
                })?;

            // Message streaming.
            let _on_message_callback = {
                let sender = sender.clone();
                    let _on_message_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
                        let sender = sender.clone();
                        spawn_local(async move {
                            let result: crate::Result<crate::Message> = crate::message::Message::try_from(e)
                                .map_err(|err| {
                                    // Convert our Error to BackendError (JsValue)
                                    match err {
                                        crate::error::Error::UnsupportedMessageType(msg) => {
                                            crate::error::Error::ReceiveError(
                                                JsValue::from_str(&format!("Unsupported message type: {}", msg))
                                            )
                                        },
                                        other => other
                                    }
                                });
                            sender.send(result).await.ok();
                        })
                    }) as Box<dyn FnMut(MessageEvent)>);
                websocket.set_onmessage(Some(_on_message_callback.as_ref().unchecked_ref()));
                _on_message_callback
            };
            // Close event.
            let _on_close_callback = Closure::wrap(Box::new(move |_e: CloseEvent| {
                sender.close();
            }) as Box<dyn FnMut(CloseEvent)>);
            websocket.set_onclose(Some(_on_close_callback.as_ref().unchecked_ref()));

            websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);
            Ok((
                WebSocketSender { websocket },
                WebSocketReceiver { receiver, _on_message_callback, _on_close_callback }
            ))
        }
    }
}


pub type BackendError = JsValue;