use crate::{ import::*, * };
pub struct WsStream
{
ws: SendWrapper< Rc< WebSocket > >,
queue: SendWrapper< Rc<RefCell< VecDeque<WsMessage> >> >,
waker: SendWrapper< Rc<RefCell< Option<Waker> >> >,
sink_waker: SendWrapper< Rc<RefCell< Option<Waker> >> >,
pharos: SharedPharos<WsEvent>,
_on_open : SendWrapper< Closure< dyn FnMut() > >,
_on_error: SendWrapper< Closure< dyn FnMut() > >,
_on_close: SendWrapper< Closure< dyn FnMut( JsCloseEvt ) > >,
_on_mesg : SendWrapper< Closure< dyn FnMut( MessageEvent ) > >,
closer: Option<SendWrapper< Pin<Box< dyn Future< Output=() > + Send >> >>,
}
impl WsStream
{
pub(crate) fn new
(
ws : SendWrapper< Rc<WebSocket> > ,
pharos : SharedPharos<WsEvent> ,
on_open : SendWrapper< Closure< dyn FnMut() > > ,
on_error: SendWrapper< Closure< dyn FnMut() > > ,
on_close: SendWrapper< Closure< dyn FnMut( JsCloseEvt ) > > ,
) -> Self
{
let waker : SendWrapper< Rc<RefCell<Option<Waker>>> > = SendWrapper::new( Rc::new( RefCell::new( None )) );
let sink_waker: SendWrapper< Rc<RefCell<Option<Waker>>> > = SendWrapper::new( Rc::new( RefCell::new( None )) );
let queue = SendWrapper::new( Rc::new( RefCell::new( VecDeque::new() ) ) );
let q2 = queue.clone();
let w2 = waker.clone();
let ph2 = pharos.clone();
#[ allow( trivial_casts ) ]
let on_mesg = Closure::wrap( Box::new( move |msg_evt: MessageEvent|
{
match WsMessage::try_from( msg_evt )
{
Ok (msg) => q2.borrow_mut().push_back( msg ),
Err(err) => notify( ph2.clone(), WsEvent::WsErr( err ) ),
}
if let Some( w ) = w2.borrow_mut().take()
{
w.wake()
}
}) as Box< dyn FnMut( MessageEvent ) > );
ws.set_onmessage ( Some( on_mesg.as_ref().unchecked_ref() ) );
let ph = pharos .clone();
let wake = waker .clone();
let swake = sink_waker.clone();
let wake_on_close = async move
{
let mut rx;
{
match ph.observe_shared( Filter::Pointer( WsEvent::is_closed ).into() ).await
{
Ok(events) => rx = events ,
Err(e) => unreachable!( "{:?}", e ) , }
}
rx.next().await;
if let Some(w) = &*wake.borrow()
{
w.wake_by_ref();
}
if let Some(w) = &*swake.borrow()
{
w.wake_by_ref();
}
};
spawn_local( wake_on_close );
Self
{
ws ,
queue ,
waker ,
sink_waker ,
pharos ,
closer : None ,
_on_mesg : SendWrapper::new( on_mesg ) ,
_on_open : on_open ,
_on_error : on_error ,
_on_close : on_close ,
}
}
pub fn ready_state( &self ) -> WsState
{
self.ws.ready_state().try_into()
.expect_throw( "Convert ready state from browser API" )
}
pub fn wrapped( &self ) -> &WebSocket
{
&self.ws
}
pub fn into_io( self ) -> IoStream< WsStreamIo, Vec<u8> >
{
IoStream::new( WsStreamIo::new( self ) )
}
}
impl fmt::Debug for WsStream
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
write!( f, "WsStream for connection: {}", self.ws.url() )
}
}
impl Drop for WsStream
{
fn drop( &mut self )
{
match self.ready_state()
{
WsState::Closing | WsState::Closed => {}
_ =>
{
self.ws.close().expect( "WsStream::drop - close ws socket" );
notify( self.pharos.clone(), WsEvent::Closing )
}
}
self.ws.set_onmessage( None );
self.ws.set_onerror ( None );
self.ws.set_onopen ( None );
self.ws.set_onclose ( None );
}
}
impl Stream for WsStream
{
type Item = WsMessage;
fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option< Self::Item >>
{
if self.queue.borrow().is_empty()
{
*self.waker.borrow_mut() = Some( cx.waker().clone() );
match self.ready_state()
{
WsState::Open | WsState::Connecting => Poll::Pending ,
_ => None.into() ,
}
}
else { self.queue.borrow_mut().pop_front().into() }
}
}
impl Sink<WsMessage> for WsStream
{
type Error = WsErr;
fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
match self.ready_state()
{
WsState::Connecting =>
{
*self.sink_waker.borrow_mut() = Some( cx.waker().clone() );
Poll::Pending
}
WsState::Open => Ok(()).into(),
_ => Err( WsErr::ConnectionNotOpen ).into(),
}
}
fn start_send( self: Pin<&mut Self>, item: WsMessage ) -> Result<(), Self::Error>
{
match self.ready_state()
{
WsState::Open =>
{
match item
{
#[cfg(not(target_feature = "atomics"))]
WsMessage::Binary( d ) => self.ws.send_with_u8_array ( &d ) .map_err( |_| WsErr::ConnectionNotOpen )? ,
#[cfg(target_feature = "atomics")]
WsMessage::Binary( d ) => self.ws.send_with_js_u8_array( &Uint8Array::from( d.as_slice() ) ).map_err( |_| WsErr::ConnectionNotOpen )? ,
WsMessage::Text ( s ) => self.ws.send_with_str ( &s ) .map_err( |_| WsErr::ConnectionNotOpen )? ,
}
Ok(())
},
_ => Err( WsErr::ConnectionNotOpen ),
}
}
fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
Ok(()).into()
}
fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
let state = self.ready_state();
if state == WsState::Connecting
|| state == WsState::Open
{
self.ws.close().unwrap_throw();
notify( self.pharos.clone(), WsEvent::Closing );
}
match state
{
WsState::Closed => Ok(()).into(),
_ =>
{
if self.closer.is_none()
{
let mut ph = self.pharos.clone();
let closer = async move
{
let mut rx = match ph.observe( Filter::Pointer( WsEvent::is_closed ).into() ).await
{
Ok(events) => events ,
Err(e) => unreachable!( "{:?}", e ) , };
rx.next().await;
};
self.closer = Some(SendWrapper::new( closer.boxed() ));
}
ready!( self.closer.as_mut().unwrap().as_mut().poll(cx) );
Ok(()).into()
}
}
}
}