futures-util 0.3.32

Common utilities and extension traits for the futures-rs library.
Documentation
use core::marker::PhantomData;
use core::pin::Pin;

use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;

use pin_project_lite::pin_project;

use crate::future::Either;
use crate::stream::stream::flatten_unordered::{
    FlattenUnorderedWithFlowController, FlowController, FlowStep,
};
use crate::stream::IntoStream;
use crate::TryStreamExt;

delegate_all!(
    /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
    TryFlattenUnordered<St>(
        FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
        + New[
            |stream: St, limit: impl Into<Option<usize>>|
                FlattenUnorderedWithFlowController::new(
                    NestedTryStreamIntoEitherTryStream::new(stream),
                    limit.into()
                )
        ]
    where
        St: TryStream,
        St::Ok: TryStream,
        St::Ok: Unpin,
        <St::Ok as TryStream>::Error: From<St::Error>
);

pin_project! {
    /// Emits either successful streams or single-item streams containing the underlying errors.
    /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
    #[derive(Debug)]
    #[must_use = "streams do nothing unless polled"]
    pub struct NestedTryStreamIntoEitherTryStream<St>
        where
            St: TryStream,
            St::Ok: TryStream,
            St::Ok: Unpin,
            <St::Ok as TryStream>::Error: From<St::Error>
        {
            #[pin]
            stream: St
        }
}

impl<St> NestedTryStreamIntoEitherTryStream<St>
where
    St: TryStream,
    St::Ok: TryStream + Unpin,
    <St::Ok as TryStream>::Error: From<St::Error>,
{
    fn new(stream: St) -> Self {
        Self { stream }
    }

    delegate_access_inner!(stream, St, ());
}

/// Emits a single item immediately, then stream will be terminated.
#[derive(Debug, Clone)]
pub struct Single<T>(Option<T>);

impl<T> Single<T> {
    /// Constructs new `Single` with the given value.
    fn new(val: T) -> Self {
        Self(Some(val))
    }

    /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
    fn next_immediate(&mut self) -> Option<T> {
        self.0.take()
    }
}

impl<T> Unpin for Single<T> {}

impl<T> Stream for Single<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Ready(self.0.take())
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
    }
}

/// Immediately propagates errors occurred in the base stream.
#[derive(Debug, Clone, Copy)]
pub struct PropagateBaseStreamError<St>(PhantomData<St>);

type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;

impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
where
    St: TryStream,
    St::Ok: TryStream + Unpin,
    <St::Ok as TryStream>::Error: From<St::Error>,
{
    fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
        match item {
            // A new successful inner stream received
            st @ Either::Left(_) => FlowStep::Continue(st),
            // An error encountered
            Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
        }
    }
}

type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;

impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
where
    St: TryStream,
    St::Ok: TryStream + Unpin,
    <St::Ok as TryStream>::Error: From<St::Error>,
{
    // Item is either an inner stream or a stream containing a single error.
    // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
    type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let item = ready!(self.project().stream.try_poll_next(cx));

        let out = match item {
            Some(res) => match res {
                // Emit successful inner stream as is
                Ok(stream) => Either::Left(stream.into_stream()),
                // Wrap an error into a stream containing a single item
                err @ Err(_) => {
                    let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);

                    Either::Right(Single::new(res))
                }
            },
            None => return Poll::Ready(None),
        };

        Poll::Ready(Some(out))
    }
}

impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
where
    St: TryStream + FusedStream,
    St::Ok: TryStream + Unpin,
    <St::Ok as TryStream>::Error: From<St::Error>,
{
    fn is_terminated(&self) -> bool {
        self.stream.is_terminated()
    }
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
where
    St: TryStream + Sink<Item>,
    St::Ok: TryStream + Unpin,
    <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
{
    type Error = <St as Sink<Item>>::Error;

    delegate_sink!(stream, Item);
}