use crate:: { import::* };
#[ derive( Debug ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "implementation" )) ) ]
pub struct NurseryStream<Out>
{
rx : UnboundedReceiver<JoinHandle<Out>> ,
unordered: FuturesUnordered<JoinHandle<Out>> ,
rx_closed: bool ,
}
impl<Out> NurseryStream<Out>
{
pub(crate) fn new( rx: UnboundedReceiver<JoinHandle<Out>> ) -> Self
where Out: 'static
{
let unordered = FuturesUnordered::new();
Self
{
unordered ,
rx ,
rx_closed: false ,
}
}
pub fn close_nursery( &mut self ) -> &mut Self
{
self.rx.close();
self
}
}
impl<Out> Stream for NurseryStream<Out>
where Out: 'static
{
type Item = Out;
fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>
{
while !self.rx_closed
{
match Pin::new( &mut self.as_mut().rx ).poll_next(cx)
{
Poll::Pending => break ,
Poll::Ready( None ) => self.rx_closed = true ,
Poll::Ready( Some(handle) ) => self.unordered.push( handle ) ,
}
}
match ready!( Pin::new( &mut self.as_mut().unordered ).poll_next(cx) )
{
None if self.rx_closed => Poll::Ready(None) ,
None => Poll::Pending ,
out => Poll::Ready(out) ,
}
}
fn size_hint( &self ) -> (usize, Option<usize>)
{
(self.unordered.size_hint().0, None)
}
}
impl<Out> Future for NurseryStream<Out>
where Out: 'static
{
type Output = ();
fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>
{
loop
{
if ready!( self.as_mut().poll_next(cx) ).is_none()
{
return Poll::Ready(())
}
}
}
}
impl<Out> FusedFuture for NurseryStream<Out>
where Out: 'static
{
fn is_terminated(&self) -> bool
{
self.rx_closed && self.unordered.is_terminated()
}
}
impl<Out> FusedStream for NurseryStream<Out>
where Out: 'static
{
fn is_terminated(&self) -> bool
{
self.rx_closed && self.unordered.is_terminated()
}
}