futures-util 0.3.5

Common utilities and extension traits for the futures-rs library.
Documentation
use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead};
use std::cmp;
use std::io::{Error, Result};

/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
pub struct IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + Unpin,
    St::Ok: AsRef<[u8]>,
{
    stream: St,
    state: ReadState<St::Ok>,
}

impl<St> Unpin for IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + Unpin,
    St::Ok: AsRef<[u8]>,
{
}

#[derive(Debug)]
enum ReadState<T: AsRef<[u8]>> {
    Ready { chunk: T, chunk_start: usize },
    PendingChunk,
    Eof,
}

impl<St> IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + Unpin,
    St::Ok: AsRef<[u8]>,
{
    pub(super) fn new(stream: St) -> Self {
        IntoAsyncRead {
            stream,
            state: ReadState::PendingChunk,
        }
    }
}

impl<St> AsyncRead for IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + Unpin,
    St::Ok: AsRef<[u8]>,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        loop {
            match &mut self.state {
                ReadState::Ready { chunk, chunk_start } => {
                    let chunk = chunk.as_ref();
                    let len = cmp::min(buf.len(), chunk.len() - *chunk_start);

                    buf[..len].copy_from_slice(
                        &chunk[*chunk_start..*chunk_start + len],
                    );
                    *chunk_start += len;

                    if chunk.len() == *chunk_start {
                        self.state = ReadState::PendingChunk;
                    }

                    return Poll::Ready(Ok(len));
                }
                ReadState::PendingChunk => {
                    match ready!(self.stream.try_poll_next_unpin(cx)) {
                        Some(Ok(chunk)) => {
                            if !chunk.as_ref().is_empty() {
                                self.state = ReadState::Ready {
                                    chunk,
                                    chunk_start: 0,
                                };
                            }
                        }
                        Some(Err(err)) => {
                            self.state = ReadState::Eof;
                            return Poll::Ready(Err(err));
                        }
                        None => {
                            self.state = ReadState::Eof;
                            return Poll::Ready(Ok(0));
                        }
                    }
                }
                ReadState::Eof => {
                    return Poll::Ready(Ok(0));
                }
            }
        }
    }
}

impl<St> AsyncWrite for IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + AsyncWrite + Unpin,
    St::Ok: AsRef<[u8]>,
{
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize>> {
        Pin::new( &mut self.stream ).poll_write( cx, buf )
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<()>> {
        Pin::new( &mut self.stream ).poll_flush( cx )
    }

    fn poll_close(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<()>> {
        Pin::new( &mut self.stream ).poll_close( cx )
    }
}

impl<St> AsyncBufRead for IntoAsyncRead<St>
where
    St: TryStream<Error = Error> + Unpin,
    St::Ok: AsRef<[u8]>,
{
    fn poll_fill_buf(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<&[u8]>> {
        while let ReadState::PendingChunk = self.state {
            match ready!(self.stream.try_poll_next_unpin(cx)) {
                Some(Ok(chunk)) => {
                    if !chunk.as_ref().is_empty() {
                        self.state = ReadState::Ready {
                            chunk,
                            chunk_start: 0,
                        };
                    }
                }
                Some(Err(err)) => {
                    self.state = ReadState::Eof;
                    return Poll::Ready(Err(err));
                }
                None => {
                    self.state = ReadState::Eof;
                    return Poll::Ready(Ok(&[]));
                }
            }
        }

        if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
            let chunk = chunk.as_ref();
            return Poll::Ready(Ok(&chunk[chunk_start..]));
        }

        // To get to this point we must be in ReadState::Eof
        Poll::Ready(Ok(&[]))
    }

    fn consume(
        mut self: Pin<&mut Self>,
        amount: usize,
    ) {
         // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
        if amount == 0 { return }
        if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
            *chunk_start += amount;
            debug_assert!(*chunk_start <= chunk.as_ref().len());
            if *chunk_start >= chunk.as_ref().len() {
                self.state = ReadState::PendingChunk;
            }
        } else {
            debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
        }
    }
}