use crate::{ import::*, BoxEnvelope, ChanReceiver, StrongCount };
pub(crate) struct RxStrong<A>
{
rx : ChanReceiver<A>,
count: Arc<Mutex< StrongCount >>,
}
impl<A> RxStrong<A> where A: Actor
{
pub(crate) fn new( rx: ChanReceiver<A> ) -> Self
{
let count = Arc::new( Mutex::new( StrongCount::new() ) );
Self{ rx, count }
}
pub(crate) fn count( &self ) -> Arc<Mutex< StrongCount >>
{
self.count.clone()
}
}
impl<A> Stream for RxStrong<A> where A: Actor
{
type Item = BoxEnvelope<A>;
fn poll_next( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_> ) -> Poll< Option<Self::Item> >
{
trace!( "RxStrong polled" );
match Pin::new( &mut self.rx ).poll_next( cx )
{
Poll::Pending =>
{
trace!( "RxStrong: inner channel returned Pending" );
let count = self.count.lock().expect( "Mutex<StrongCount> poisoned" );
if count.count() == 0
{
Poll::Ready( None )
}
else
{
count.store_waker( cx.waker() );
Poll::Pending
}
}
x =>
{
trace!( "RxStrong: inner channel returned Poll::Ready(Some(_)): {}", matches!( x, Poll::Ready(Some(_)) ) );
x
}
}
}
fn size_hint( &self ) -> (usize, Option<usize>)
{
self.rx.size_hint()
}
}
impl<A> fmt::Debug for RxStrong<A>
{
fn fmt( &self, fmt: &mut fmt::Formatter<'_> ) -> fmt::Result
{
fmt.debug_struct( "RxStrong<A>" )
.field( "count", &self.count )
.finish()
}
}