use std::fmt;
use std::rc::Rc;
use futures::StreamExt;
use pharos::{Filter, Observable, Observe, ObserveConfig, PharErr, SharedPharos};
use send_wrapper::SendWrapper;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::{JsCast, UnwrapThrowExt};
use web_sys::{BinaryType, CloseEvent as JsCloseEvt, DomException, WebSocket};
use crate::{notify, CloseEvent, WsErr, WsEvent, WsState, WsStream};
pub struct WsMeta {
ws: SendWrapper<Rc<WebSocket>>,
pharos: SharedPharos<WsEvent>,
}
impl WsMeta {
const OPEN_CLOSE: Filter<WsEvent> =
Filter::Pointer(|evt: &WsEvent| evt.is_open() | evt.is_closed());
pub async fn connect(url: impl AsRef<str>) -> Result<(Self, WsStream), WsErr> {
let ws = match WebSocket::new(url.as_ref()) {
Ok(ws) => SendWrapper::new(Rc::new(ws)),
Err(e) => {
let de: &DomException = e.unchecked_ref();
return match de.code() {
DomException::SYNTAX_ERR => Err(WsErr::InvalidUrl {
supplied: url.as_ref().to_string(),
}),
code => {
if code == 0 {
Err(WsErr::Other(
e.as_string().unwrap_or_else(|| String::from("None")),
))
} else {
Err(WsErr::Dom(code))
}
}
};
}
};
let mut pharos = SharedPharos::default();
let ph1 = pharos.clone();
let ph2 = pharos.clone();
let ph3 = pharos.clone();
let ph4 = pharos.clone();
let on_open = Closure::wrap(Box::new(move || {
notify(ph1.clone(), WsEvent::Open)
}) as Box<dyn FnMut()>);
#[allow(trivial_casts)]
let on_error = Closure::wrap(Box::new(move || {
notify(ph2.clone(), WsEvent::Error)
}) as Box<dyn FnMut()>);
#[allow(trivial_casts)]
let on_close = Closure::wrap(Box::new(move |evt: JsCloseEvt| {
let c = WsEvent::Closed(CloseEvent {
code: evt.code(),
reason: evt.reason(),
was_clean: evt.was_clean(),
});
notify(ph3.clone(), c)
}) as Box<dyn FnMut(JsCloseEvt)>);
ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
let guard = {
struct Guard<'lt> {
ws: &'lt WebSocket,
}
impl Drop for Guard<'_> {
fn drop(&mut self) {
self.ws.set_onopen(None);
self.ws.set_onclose(None);
self.ws.set_onerror(None);
if let Ok(WsState::Open) = self.ws.ready_state().try_into() {
let _ = self.ws.close();
}
println!(
"WsMeta::connect future was dropped while connecting to: {}.",
self.ws.url()
);
}
}
Guard { ws: &ws }
};
let mut evts = pharos
.observe(Self::OPEN_CLOSE.into())
.await
.expect("we didn't close pharos");
if let Some(WsEvent::Closed(evt)) = evts.next().await {
return Err(WsErr::ConnectionFailed { event: evt });
}
std::mem::forget(guard);
ws.set_binary_type(BinaryType::Arraybuffer);
Ok((
Self {
pharos,
ws: ws.clone(),
},
WsStream::new(
ws,
ph4,
SendWrapper::new(on_open),
SendWrapper::new(on_error),
SendWrapper::new(on_close),
),
))
}
pub async fn close(&self) -> Result<CloseEvent, WsErr> {
match self.ready_state() {
WsState::Closed => return Err(WsErr::ConnectionNotOpen),
WsState::Closing => {}
WsState::Open => {
let _ = self.ws.close();
notify(self.pharos.clone(), WsEvent::Closing)
}
WsState::Connecting => {
notify(self.pharos.clone(), WsEvent::Closing)
}
}
let mut evts = match self
.pharos
.observe_shared(Filter::Pointer(WsEvent::is_closed).into())
.await
{
Ok(events) => events,
Err(e) => unreachable!("{:?}", e), };
let ce = evts.next().await.expect_throw("receive a close event");
if let WsEvent::Closed(e) = ce {
Ok(e)
} else {
unreachable!()
}
}
pub async fn close_code(&self, code: u16) -> Result<CloseEvent, WsErr> {
match self.ready_state() {
WsState::Closed => return Err(WsErr::ConnectionNotOpen),
WsState::Closing => {}
_ => {
match self.ws.close_with_code(code) {
Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
Err(_) => {
return Err(WsErr::InvalidCloseCode { supplied: code });
}
}
}
}
let mut evts = match self
.pharos
.observe_shared(Filter::Pointer(WsEvent::is_closed).into())
.await
{
Ok(events) => events,
Err(e) => unreachable!("{:?}", e), };
let ce = evts.next().await.expect_throw("receive a close event");
if let WsEvent::Closed(e) = ce {
Ok(e)
} else {
unreachable!()
}
}
pub async fn close_reason(
&self,
code: u16,
reason: impl AsRef<str>,
) -> Result<CloseEvent, WsErr> {
match self.ready_state() {
WsState::Closed => return Err(WsErr::ConnectionNotOpen),
WsState::Closing => {}
_ => {
if reason.as_ref().len() > 123 {
return Err(WsErr::ReasonStringToLong);
}
match self.ws.close_with_code_and_reason(code, reason.as_ref()) {
Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
Err(_) => return Err(WsErr::InvalidCloseCode { supplied: code }),
}
}
}
let mut evts = match self
.pharos
.observe_shared(Filter::Pointer(WsEvent::is_closed).into())
.await
{
Ok(events) => events,
Err(e) => unreachable!("{:?}", e), };
let ce = evts.next().await.expect_throw("receive a close event");
if let WsEvent::Closed(e) = ce {
Ok(e)
} else {
unreachable!()
}
}
pub fn ready_state(&self) -> WsState {
self.ws
.ready_state()
.try_into()
.expect_throw("Convert ready state from browser API")
}
pub fn wrapped(&self) -> &WebSocket {
&self.ws
}
pub fn buffered_amount(&self) -> u32 {
self.ws.buffered_amount()
}
pub fn extensions(&self) -> String {
self.ws.extensions()
}
pub fn protocol(&self) -> String {
self.ws.protocol()
}
pub fn url(&self) -> String {
self.ws.url()
}
}
impl fmt::Debug for WsMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WsMeta for connection: {}", self.url())
}
}
impl Observable<WsEvent> for WsMeta {
type Error = PharErr;
fn observe(&mut self, options: ObserveConfig<WsEvent>) -> Observe<'_, WsEvent, Self::Error> {
self.pharos.observe(options)
}
}