use std::fmt;
use std::sync::Arc;
use futures::StreamExt;
use url::Url;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::JsCast;
use web_sys::{BinaryType, CloseEvent as JsCloseEvt, DomException, WebSocket as WebSysSocket};
use crate::wasm::pharos::{Filter, Observable, Observe, ObserveConfig, PharErr, SharedPharos};
use crate::wasm::{notify, CloseEvent, Error, WsEvent, WsState, WsStream};
pub struct WebSocket {
ws: Arc<WebSysSocket>,
pharos: SharedPharos<WsEvent>,
}
impl WebSocket {
const OPEN_CLOSE: Filter<WsEvent> =
Filter::Pointer(|evt: &WsEvent| evt.is_open() | evt.is_closed());
pub async fn connect(url: &Url) -> Result<(Self, WsStream), Error> {
let ws: Arc<WebSysSocket> = match WebSysSocket::new(url.as_str()) {
Ok(ws) => Arc::new(ws),
Err(e) => {
let de: &DomException = e.unchecked_ref();
return match de.code() {
DomException::SYNTAX_ERR => Err(Error::InvalidUrl {
supplied: url.to_string(),
}),
code => {
if code == 0 {
Err(Error::Other(
e.as_string().unwrap_or_else(|| String::from("None")),
))
} else {
Err(Error::Dom(code))
}
}
};
}
};
let mut pharos = SharedPharos::default();
let ph1 = pharos.clone();
let ph2 = pharos.clone();
let ph3 = pharos.clone();
let ph4 = pharos.clone();
let on_open = Closure::wrap(Box::new(move || {
notify(ph1.clone(), WsEvent::Open)
}) as Box<dyn FnMut()>);
#[allow(trivial_casts)]
let on_error = Closure::wrap(Box::new(move || {
notify(ph2.clone(), WsEvent::Error)
}) as Box<dyn FnMut()>);
#[allow(trivial_casts)]
let on_close = Closure::wrap(Box::new(move |evt: JsCloseEvt| {
let c = WsEvent::Closed(CloseEvent {
code: evt.code(),
reason: evt.reason(),
was_clean: evt.was_clean(),
});
notify(ph3.clone(), c)
}) as Box<dyn FnMut(JsCloseEvt)>);
ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
let guard = {
struct Guard<'lt> {
ws: &'lt WebSysSocket,
}
impl Drop for Guard<'_> {
fn drop(&mut self) {
self.ws.set_onopen(None);
self.ws.set_onclose(None);
self.ws.set_onerror(None);
if let Ok(WsState::Open) = self.ws.ready_state().try_into() {
let _ = self.ws.close();
}
println!(
"WebSocket::connect future was dropped while connecting to: {}.",
self.ws.url()
);
}
}
Guard { ws: &ws }
};
let mut evts = pharos
.observe(Self::OPEN_CLOSE.into())
.await
.expect("we didn't close pharos");
if let Some(WsEvent::Closed(evt)) = evts.next().await {
return Err(Error::ConnectionFailed { event: evt });
}
std::mem::forget(guard);
ws.set_binary_type(BinaryType::Arraybuffer);
Ok((
Self {
pharos,
ws: ws.clone(),
},
WsStream::new(
ws,
ph4,
Arc::new(on_open),
Arc::new(on_error),
Arc::new(on_close),
),
))
}
pub fn url(&self) -> String {
self.ws.url()
}
}
impl fmt::Debug for WebSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WebSocket for connection: {}", self.url())
}
}
impl Observable<WsEvent> for WebSocket {
type Error = PharErr;
fn observe(&mut self, options: ObserveConfig<WsEvent>) -> Observe<'_, WsEvent, Self::Error> {
self.pharos.observe(options)
}
}