use crate::{ import::*, WsErr, WsState, WsStream, WsEvent, CloseEvent, notify };
pub struct WsMeta
{
ws : SendWrapper< Rc<WebSocket> > ,
pharos: SendWrapper< Rc<RefCell< Pharos<WsEvent> >> > ,
}
impl WsMeta
{
const OPEN_CLOSE: Filter<WsEvent> = Filter::Pointer( |evt: &WsEvent| evt.is_open() | evt.is_closed() );
pub async fn connect( url: impl AsRef<str>, protocols: impl Into<Option<Vec<&str>>> )
-> Result< (Self, WsStream), WsErr >
{
let res = match protocols.into()
{
None => WebSocket::new( url.as_ref() ),
Some(v) =>
{
let js_protos = v.iter().fold( Array::new(), |acc, proto|
{
acc.push( &JsValue::from_str( proto ) );
acc
});
WebSocket::new_with_str_sequence( url.as_ref(), &js_protos )
}
};
let ws = match res
{
Ok(ws) => SendWrapper::new( Rc::new( ws ) ),
Err(e) =>
{
let de: &DomException = e.unchecked_ref();
match de.code()
{
DomException::SECURITY_ERR => return Err( WsErr::ForbiddenPort ),
DomException::SYNTAX_ERR =>
return Err( WsErr::InvalidUrl{ supplied: url.as_ref().to_string() } ),
_ => unreachable!(),
};
}
};
let pharos = SendWrapper::new( Rc::new( RefCell::new( Pharos::default() )) );
let ph1 = pharos.clone();
let ph2 = pharos.clone();
let ph3 = pharos.clone();
let ph4 = pharos.clone();
#[ allow( trivial_casts ) ]
let on_open = Closure::wrap( Box::new( move ||
{
notify( ph1.clone(), WsEvent::Open )
}) as Box< dyn FnMut() > );
#[ allow( trivial_casts ) ]
let on_error = Closure::wrap( Box::new( move ||
{
notify( ph2.clone(), WsEvent::Error )
}) as Box< dyn FnMut() > );
#[ allow( trivial_casts ) ]
let on_close = Closure::wrap( Box::new( move |evt: JsCloseEvt|
{
let c = WsEvent::Closed( CloseEvent
{
code : evt.code() ,
reason : evt.reason() ,
was_clean: evt.was_clean(),
});
notify( ph3.clone(), c )
}) as Box< dyn FnMut( JsCloseEvt ) > );
ws.set_onopen ( Some( &on_open .as_ref().unchecked_ref() ));
ws.set_onclose( Some( &on_close.as_ref().unchecked_ref() ));
ws.set_onerror( Some( &on_error.as_ref().unchecked_ref() ));
let mut evts = pharos.borrow_mut().observe( Self::OPEN_CLOSE.into() )
.expect( "we didn't close pharos" )
;
if let Some( WsEvent::Closed(evt) ) = evts.next().await
{
return Err( WsErr::ConnectionFailed{ event: evt } )
}
ws.set_binary_type( BinaryType::Arraybuffer );
Ok
((
Self
{
pharos,
ws: ws.clone(),
},
WsStream::new
(
ws,
ph4,
SendWrapper::new( on_open ),
SendWrapper::new( on_error ),
SendWrapper::new( on_close ),
)
))
}
pub async fn close( &self ) -> Result< CloseEvent, WsErr >
{
match self.ready_state()
{
WsState::Closed => return Err( WsErr::ConnectionNotOpen ),
WsState::Closing => {}
_ =>
{
self.ws.close().unwrap_throw();
notify( self.pharos.clone(), WsEvent::Closing )
}
}
let mut evts = match self.pharos.borrow_mut().observe( Filter::Pointer( WsEvent::is_closed ).into() )
{
Ok(events) => events ,
Err(e) => unreachable!( "{:?}", e ) , };
let ce = evts.next().await.expect_throw( "receive a close event" );
if let WsEvent::Closed(e) = ce { Ok( e ) }
else { unreachable!() }
}
pub async fn close_code( &self, code: u16 ) -> Result<CloseEvent, WsErr>
{
match self.ready_state()
{
WsState::Closed => return Err( WsErr::ConnectionNotOpen ),
WsState::Closing => {}
_ =>
{
match self.ws.close_with_code( code )
{
Ok(_) => notify( self.pharos.clone(), WsEvent::Closing ),
Err(_) =>
{
return Err( WsErr::InvalidCloseCode{ supplied: code } );
}
}
}
}
let mut evts = match self.pharos.borrow_mut().observe( Filter::Pointer( WsEvent::is_closed ).into() )
{
Ok(events) => events ,
Err(e) => unreachable!( "{:?}", e ) , };
let ce = evts.next().await.expect_throw( "receive a close event" );
if let WsEvent::Closed(e) = ce { Ok(e) }
else { unreachable!() }
}
pub async fn close_reason( &self, code: u16, reason: impl AsRef<str> ) -> Result<CloseEvent, WsErr>
{
match self.ready_state()
{
WsState::Closed => return Err( WsErr::ConnectionNotOpen ),
WsState::Closing => {}
_ =>
{
if reason.as_ref().len() > 123
{
return Err( WsErr::ReasonStringToLong );
}
match self.ws.close_with_code_and_reason( code, reason.as_ref() )
{
Ok(_) => notify( self.pharos.clone(), WsEvent::Closing ),
Err(_) =>
{
return Err( WsErr::InvalidCloseCode{ supplied: code } )
}
}
}
}
let mut evts = match self.pharos.borrow_mut().observe( Filter::Pointer( WsEvent::is_closed ).into() )
{
Ok(events) => events ,
Err(e) => unreachable!( "{:?}", e ) , };
let ce = evts.next().await.expect_throw( "receive a close event" );
if let WsEvent::Closed(e) = ce { Ok(e) }
else { unreachable!() }
}
pub fn ready_state( &self ) -> WsState
{
self.ws.ready_state().try_into()
.expect_throw( "Convert ready state from browser API" )
}
pub fn wrapped( &self ) -> &WebSocket
{
&self.ws
}
pub fn buffered_amount( &self ) -> u32
{
self.ws.buffered_amount()
}
pub fn extensions( &self ) -> String
{
self.ws.extensions()
}
pub fn protocol(&self) -> String
{
self.ws.protocol()
}
pub fn url( &self ) -> String
{
self.ws.url()
}
}
impl fmt::Debug for WsMeta
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
write!( f, "WsMeta for connection: {}", self.url() )
}
}
impl Observable<WsEvent> for WsMeta
{
type Error = pharos::Error;
fn observe( &mut self, options: ObserveConfig<WsEvent> ) -> Result< Events<WsEvent>, Self::Error >
{
self.pharos.borrow_mut().observe( options )
}
}