ruchei 0.1.3-a.0

Utilities for working with many streams
Documentation
use std::{
    collections::VecDeque,
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{
    Future, Sink, StreamExt, TryStream, TryStreamExt,
    stream::{Fuse, FusedStream, IntoStream},
};
use pin_project::pin_project;

#[derive(Debug)]
#[pin_project]
#[must_use = "futures must be awaited"]
pub struct Echo<S, T = <S as TryStream>::Ok> {
    #[pin]
    stream: Fuse<IntoStream<S>>,
    queue: VecDeque<T>,
    item: Option<T>,
    started: bool,
}

impl<S: Default + TryStream, T> Default for Echo<S, T> {
    fn default() -> Self {
        S::default().into()
    }
}

impl<T, E, S: TryStream<Ok = T, Error = E> + Sink<T, Error = E>> Future for Echo<S, T> {
    type Output = Result<(), E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        if !this.stream.is_terminated() {
            while let Poll::Ready(ready) = this.stream.as_mut().try_poll_next(cx)? {
                match ready {
                    Some(t) => this.queue.push_back(t),
                    None => return Poll::Ready(Ok(())),
                }
            }
        }
        loop {
            match this.item.take() {
                Some(item) => match this.stream.as_mut().poll_ready(cx)? {
                    Poll::Ready(()) => {
                        this.stream.as_mut().start_send(item)?;
                        *this.started = true;
                    }
                    Poll::Pending => {
                        *this.item = Some(item);
                        return Poll::Pending;
                    }
                },
                None => match this.queue.pop_front() {
                    Some(item) => *this.item = Some(item),
                    None => {
                        break;
                    }
                },
            }
        }
        if *this.started && this.stream.as_mut().poll_flush(cx)?.is_ready() {
            *this.started = false;
        }
        Poll::Pending
    }
}

impl<T, S: TryStream> From<S> for Echo<S, T> {
    fn from(stream: S) -> Self {
        Self {
            stream: stream.into_stream().fuse(),
            queue: Default::default(),
            item: None,
            started: false,
        }
    }
}

pub trait EchoBuffered:
    Sized + TryStream<Ok = Self::T, Error = Self::E> + Sink<Self::T, Error = Self::E>
{
    type T;
    type E;

    fn echo_buffered(self) -> Echo<Self> {
        self.into()
    }
}

impl<T, E, S: TryStream<Ok = T, Error = E> + Sink<T, Error = E>> EchoBuffered for S {
    type T = T;
    type E = E;
}