use crate:: { import::*, WsEvent, WsErr };
#[ derive( Debug, Clone, Copy ) ]
enum State
{
Ready,
Pending,
Closed,
Flushing,
}
impl PartialEq for State
{
fn eq( &self, other: &Self ) -> bool
{
std::mem::discriminant( self ) == std::mem::discriminant( other )
}
}
pub(super) struct Notifier
{
pharos: Pharos < WsEvent > ,
events: VecDeque< WsEvent > ,
state : State ,
}
impl Notifier
{
pub(crate) fn new() -> Self
{
Self
{
pharos: Pharos::new( 2 ) ,
state : State::Ready ,
events: VecDeque::new() ,
}
}
pub(crate) fn queue( &mut self, evt: WsEvent )
{
debug_assert!( self.state != State::Closed );
self.events.push_back( evt );
self.state = State::Pending;
}
pub(crate) fn run( &mut self, cx: &mut Context<'_> ) -> Poll< Result<(), ()> >
{
let mut pharos = Pin::new( &mut self.pharos );
match self.state
{
State::Ready => Ok (()).into(),
State::Closed => Err(()).into(),
State::Pending =>
{
while !self.events.is_empty()
{
match ready!( pharos.as_mut().poll_ready( cx ) )
{
Err(e) =>
{
error!( "pharos returned an error, this could be a bug in ws_stream_tungstenite, please report: {:?}", e );
self.state = State::Closed;
return Err(()).into();
}
Ok(()) =>
{
if let Err(_e) = pharos.as_mut().start_send( self.events.pop_front().expect( "pop queued event." ) )
{
self.state = State::Closed;
return Err(()).into();
}
match pharos.as_mut().poll_flush( cx )
{
Poll::Pending =>
{
self.state = State::Flushing;
return Poll::Pending
}
Poll::Ready(Err(_e)) =>
{
self.state = State::Closed;
return Err(()).into();
}
Poll::Ready(Ok(())) => {}
}
}
}
}
self.state = State::Ready;
Ok(()).into()
}
State::Flushing =>
{
match ready!( pharos.as_mut().poll_flush( cx ) )
{
Err(e) =>
{
error!( "pharos returned an error, this could be a bug in ws_stream_tungstenite, please report: {:?}", e );
self.state = State::Closed;
Err(()).into()
}
Ok(()) =>
{
self.state = State::Ready;
Ok(()).into()
}
}
}
}
}
}
impl Observable< WsEvent > for Notifier
{
type Error = WsErr;
fn observe( &mut self, options: ObserveConfig< WsEvent > ) -> Result< Events< WsEvent >, Self::Error >
{
self.pharos.observe( options ).map_err( Into::into )
}
}
#[ cfg( test ) ]
mod tests
{
use super::*;
#[ test ]
fn notifier_state()
{
let mut not = Notifier::new();
assert_eq!( State::Ready, not.state );
not.queue( WsEvent::Ping( vec![ 1, 2, 3] ) );
assert_eq!( State::Pending, not.state );
let w = noop_waker();
let mut cx = Context::from_waker( &w );
let res = not.run( &mut cx );
assert_eq!( Poll::Ready( Ok(()) ), res );
assert_eq!( State::Ready, not.state );
not.queue( WsEvent::Closed );
assert_eq!( State::Pending, not.state );
}
#[ test ]
fn notifier_state_observers()
{
let mut not = Notifier::new();
let mut evts = not.observe( Channel::Bounded( 1 ).into() ).expect( "observe" );
assert_eq!( State::Ready, not.state );
assert_eq!( 0, not.events.len() );
not.queue( WsEvent::Ping( vec![ 1, 2, 3] ) );
not.queue( WsEvent::Ping( vec![ 1, 2, 3] ) );
assert_eq!( State::Pending, not.state );
assert_eq!( 2, not.events.len() );
let w = noop_waker();
let mut cx = Context::from_waker( &w );
let res = not.run( &mut cx );
assert_eq!( Poll::Pending, res );
assert_eq!( State::Pending, not.state );
assert_eq!( 1, not.events.len() );
let evt = block_on( evts.next() );
assert_matches!( evt, Some( WsEvent::Ping(_) ) );
let w = noop_waker();
let mut cx = Context::from_waker( &w );
let res = not.run( &mut cx );
assert_eq!( Poll::Ready( Ok(()) ), res );
assert_eq!( State::Ready, not.state );
assert_eq!( 0, not.events.len() );
let evt = block_on( evts.next() );
assert_matches!( evt, Some( WsEvent::Ping(_) ) );
}
#[ test ]
fn queue()
{
let mut not = Notifier::new();
assert_eq!( 0, not.events.len() );
not.queue( WsEvent::Ping( vec![ 1, 2, 3] ) );
assert_eq!( 1, not.events.len() );
let w = noop_waker();
let mut cx = Context::from_waker( &w );
let _ = not.run( &mut cx );
assert_eq!( 0, not.events.len() );
}
}