use crate :: { import::*, Observable, Observe, Events, ObserveConfig, events::Sender, PharErr, ErrorKind, Channel };
pub struct Pharos<Event> where Event: 'static + Clone + Send
{
observers : Vec<Option< Sender<Event> >>,
free_slots: Vec<usize> ,
closed : bool ,
}
impl<Event> fmt::Debug for Pharos<Event> where Event: 'static + Clone + Send
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
write!( f, "pharos::Pharos<{}>", type_name::<Event>() )
}
}
impl<Event> Pharos<Event> where Event: 'static + Clone + Send
{
pub fn new( capacity: usize ) -> Self
{
Self
{
observers : Vec::with_capacity( capacity ),
free_slots: Vec::with_capacity( capacity ),
closed : false ,
}
}
pub fn storage_len( &self ) -> usize
{
self.observers.len()
}
pub fn num_observers( &mut self ) -> usize
{
let mut count = 0;
for (i, opt) in self.observers.iter_mut().enumerate()
{
if let Some(observer) = opt
{
if !observer.is_closed()
{
count += 1;
}
else
{
self.free_slots.push( i );
*opt = None
}
}
}
count
}
}
impl<Event> Default for Pharos<Event> where Event: 'static + Clone + Send
{
fn default() -> Self
{
Self::new( 10 )
}
}
impl<Event> Observable<Event> for Pharos<Event> where Event: 'static + Clone + Send
{
type Error = PharErr;
fn observe( &mut self, options: ObserveConfig<Event> ) -> Observe<'_, Event, Self::Error >
{
async move
{
if self.closed
{
return Err( ErrorKind::Closed.into() );
}
if let Channel::Bounded(queue_size) = options.channel
{
if queue_size < 1
{
return Err( ErrorKind::MinChannelSizeOne.into() );
}
}
let (events, sender) = Events::new( options );
if let Some( i ) = self.free_slots.pop()
{
self.observers[i] = Some( sender );
}
else
{
self.observers.push( Some( sender ) );
}
Ok( events )
}.boxed()
}
}
impl<Event> Sink<Event> for Pharos<Event> where Event: Clone + 'static + Send
{
type Error = PharErr;
fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
if self.closed
{
return Err( ErrorKind::Closed.into() ).into();
}
#[allow(clippy::manual_flatten)]
for obs in self.get_mut().observers.iter_mut()
{
if let Some( ref mut o ) = obs
{
let res = ready!( Pin::new( o ).poll_ready( cx ) );
if res.is_err()
{
*obs = None;
}
}
}
Ok(()).into()
}
fn start_send( self: Pin<&mut Self>, evt: Event ) -> Result<(), Self::Error>
{
if self.closed
{
return Err( ErrorKind::Closed.into() );
}
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate()
{
if let Some( obs ) = opt
{
if obs.is_closed()
{
this.free_slots.push( i );
*opt = None;
}
else if obs.filter( &evt )
{
if Pin::new( obs ).start_send( evt.clone() ).is_err()
{
this.free_slots.push( i );
*opt = None;
}
}
}
}
Ok(())
}
fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
if self.closed
{
return Err( ErrorKind::Closed.into() ).into();
}
let mut pending = false;
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate()
{
if let Some( ref mut obs ) = opt
{
match Pin::new( obs ).poll_flush( cx )
{
Poll::Pending => pending = true ,
Poll::Ready(Ok(_)) => continue ,
Poll::Ready(Err(_)) =>
{
this.free_slots.push( i );
*opt = None;
}
}
}
}
if pending { Poll::Pending }
else { Ok(()).into() }
}
fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
if self.closed
{
return Ok(()).into();
}
else
{
self.closed = true;
}
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate()
{
if let Some( ref mut obs ) = opt
{
let res = ready!( Pin::new( obs ).poll_close( cx ) );
if res.is_err()
{
this.free_slots.push( i );
*opt = None;
}
}
}
Ok(()).into()
}
}
#[ cfg( test ) ]
mod tests
{
use crate :: { *, import::* };
#[test]
fn debug()
{
let lighthouse = Pharos::<bool>::default();
assert_eq!( "pharos::Pharos<bool>", &format!( "{:?}", lighthouse ) );
}
#[test]
fn new()
{
let ph = Pharos::<bool>::new( 5 );
assert_eq!( ph.observers.capacity(), 5 );
}
#[async_std::test]
async fn storage_len()
{
let mut ph = Pharos::<bool>::default();
assert_eq!( ph.storage_len (), 0 );
assert_eq!( ph.num_observers (), 0 );
assert_eq!( ph.free_slots.len(), 0 );
let mut a = ph.observe( ObserveConfig::default() ).await.expect( "observe" );
assert_eq!( ph.storage_len (), 1 );
assert_eq!( ph.num_observers (), 1 );
assert_eq!( ph.free_slots.len(), 0 );
let b = ph.observe( ObserveConfig::default() ).await.expect( "observe" );
assert_eq!( ph.storage_len (), 2 );
assert_eq!( ph.num_observers (), 2 );
assert_eq!( ph.free_slots.len(), 0 );
a.close();
assert_eq!( ph.storage_len () , 2 );
assert_eq!( ph.num_observers() , 1 );
assert_eq!( &ph.free_slots , &[0] );
drop( b );
assert_eq!( ph.storage_len (), 2 );
assert_eq!( ph.num_observers(), 0 );
assert_eq!( &ph.free_slots , &[0, 1] );
}
#[async_std::test]
async fn reuse()
{
let mut ph = Pharos::<bool>::default();
let _a = ph.observe( ObserveConfig::default() ).await;
let b = ph.observe( ObserveConfig::default() ).await;
let _c = ph.observe( ObserveConfig::default() ).await;
assert_eq!( ph.storage_len (), 3 );
assert_eq!( ph.num_observers(), 3 );
drop( b );
assert_eq!( ph.storage_len (), 3 );
assert_eq!( ph.num_observers(), 2 );
assert!( ph.observers[1].is_none() );
assert_eq!( &ph.free_slots, &[1] );
let _d = ph.observe( ObserveConfig::default() ).await;
assert_eq!( ph.storage_len (), 3 );
assert_eq!( ph.num_observers (), 3 );
assert_eq!( ph.free_slots.len(), 0 );
let _e = ph.observe( ObserveConfig::default() ).await;
assert_eq!( ph.storage_len (), 4 );
assert_eq!( ph.num_observers (), 4);
assert_eq!( ph.free_slots.len(), 0 );
}
#[async_std::test]
async fn observe_after_close()
{
let mut ph = Pharos::<bool>::default();
block_on( ph.close() ).expect( "close" );
let res = ph.observe( ObserveConfig::default() ).await;
assert! ( res.is_err() );
assert_eq!( ErrorKind::Closed, res.unwrap_err().kind() );
}
#[async_std::test]
async fn observe_refuse_zero()
{
let mut ph = Pharos::<bool>::default();
let res = ph.observe( Channel::Bounded(0).into() ).await;
assert! ( res.is_err() );
assert_eq!( ErrorKind::MinChannelSizeOne, res.unwrap_err().kind() );
}
#[async_std::test]
async fn poll_ready_pending()
{
let mut ph = Pharos::default();
let _open = ph.observe( Channel::Bounded ( 10 ).into() ).await.expect( "observe" );
let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).await.expect( "observe" );
let _unbound = ph.observe( Channel::Unbounded .into() ).await.expect( "observe" );
poll_fn( move |mut cx|
{
let mut ph = Pin::new( &mut ph );
crate::assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) );
assert!( ph.as_mut().start_send( true ).is_ok() );
crate::assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Pending );
assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(true)));
crate::assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) );
().into()
}).await;
}
#[async_std::test]
async fn poll_ready_drop()
{
let mut ph = Pharos::<bool>::default();
let _open = ph.observe( Channel::Bounded ( 10 ).into() ).await.expect( "observe" );
let full = ph.observe( Channel::Bounded ( 1 ).into() ).await.expect( "observe" );
let _unbound = ph.observe( Channel::Unbounded .into() ).await.expect( "observe" );
let mut ph = Pin::new( &mut ph );
drop( full );
poll_fn( move |mut cx|
{
crate::assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) );
assert!( ph.observers[1].is_none() );
().into()
}).await;
}
#[ test ]
fn poll_ready_closed()
{
block_on( poll_fn( move |mut cx|
{
let mut ph = Pharos::<bool>::default();
let mut ph = Pin::new( &mut ph );
crate::assert_matches!( ph.as_mut().poll_close( cx ), Poll::Ready(Ok(())) );
let res = ph.as_mut().poll_ready( &mut cx );
crate::assert_matches!( res, Poll::Ready( Err(_) ) );
match res
{
Poll::Ready( Err( e ) ) => assert_eq!( ErrorKind::Closed, e.kind() ) ,
_ => unreachable!( "wrong result " ) ,
}
().into()
}));
}
#[async_std::test]
async fn start_send_arrive()
{
let mut ph = Pharos::<usize>::default();
let _open = ph.observe( Channel::Bounded ( 10 ).into() ).await.expect( "observe" );
let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).await.expect( "observe" );
let _unbound = ph.observe( Channel::Unbounded .into() ).await.expect( "observe" );
poll_fn( move |mut cx|
{
let mut ph = Pin::new( &mut ph );
crate::assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) );
assert!( ph.as_mut().start_send( 3 ).is_ok() );
assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(3)));
().into()
}).await;
}
#[async_std::test]
async fn poll_flush_drop()
{
let mut ph = Pharos::<bool>::default();
let _open = ph.observe( Channel::Bounded ( 10 ).into() ).await.expect( "observe" );
let full = ph.observe( Channel::Bounded ( 1 ).into() ).await.expect( "observe" );
let _unbound = ph.observe( Channel::Unbounded .into() ).await.expect( "observe" );
let mut ph = Pin::new( &mut ph );
drop( full );
poll_fn( move |mut cx|
{
crate::assert_matches!( ph.as_mut().poll_flush( &mut cx ), Poll::Ready( Ok(_) ) );
assert!( ph.observers[1].is_none() );
().into()
}).await;
}
}