futures-lite 2.3.0

Futures, streams, and async I/O combinators
Documentation
//! Tools and combinators for I/O.
//!
//! # Examples
//!
//! ```
//! use futures_lite::io::{self, AsyncReadExt};
//!
//! # spin_on::spin_on(async {
//! let input: &[u8] = b"hello";
//! let mut reader = io::BufReader::new(input);
//!
//! let mut contents = String::new();
//! reader.read_to_string(&mut contents).await?;
//! # std::io::Result::Ok(()) });
//! ```

#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, Result, SeekFrom};

#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};

use std::borrow::{Borrow, BorrowMut};
use std::cmp;
use std::fmt;
use std::future::Future;
use std::io::{IoSlice, IoSliceMut};
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use futures_core::stream::Stream;
use pin_project_lite::pin_project;

use crate::future;
use crate::ready;

const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// Copies the entire contents of a reader into a writer.
///
/// This function will read data from `reader` and write it into `writer` in a streaming fashion
/// until `reader` returns EOF.
///
/// On success, returns the total number of bytes copied.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, BufReader, BufWriter};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
///
/// let mut output = Vec::new();
/// let writer = BufWriter::new(&mut output);
///
/// io::copy(reader, writer).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
where
    R: AsyncRead,
    W: AsyncWrite,
{
    pin_project! {
        struct CopyFuture<R, W> {
            #[pin]
            reader: R,
            #[pin]
            writer: W,
            amt: u64,
        }
    }

    impl<R, W> Future for CopyFuture<R, W>
    where
        R: AsyncBufRead,
        W: AsyncWrite,
    {
        type Output = Result<u64>;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let mut this = self.project();
            loop {
                let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
                if buffer.is_empty() {
                    ready!(this.writer.as_mut().poll_flush(cx))?;
                    return Poll::Ready(Ok(*this.amt));
                }

                let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
                if i == 0 {
                    return Poll::Ready(Err(ErrorKind::WriteZero.into()));
                }
                *this.amt += i as u64;
                this.reader.as_mut().consume(i);
            }
        }
    }

    let future = CopyFuture {
        reader: BufReader::new(reader),
        writer,
        amt: 0,
    };
    future.await
}

/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
///
/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
/// This is usually the case for in-memory buffered I/O.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AssertAsync, AsyncReadExt};
///
/// let reader: &[u8] = b"hello";
///
/// # spin_on::spin_on(async {
/// let mut async_reader = AssertAsync::new(reader);
/// let mut contents = String::new();
///
/// // This line works in async manner - note that there is await:
/// async_reader.read_to_string(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AssertAsync<T>(T);

impl<T> Unpin for AssertAsync<T> {}

impl<T> AssertAsync<T> {
    /// Wraps an I/O handle implementing [`std::io`] traits.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AssertAsync;
    ///
    /// let reader: &[u8] = b"hello";
    ///
    /// let async_reader = AssertAsync::new(reader);
    /// ```
    #[inline(always)]
    pub fn new(io: T) -> Self {
        AssertAsync(io)
    }

    /// Gets a reference to the inner I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AssertAsync;
    ///
    /// let reader: &[u8] = b"hello";
    ///
    /// let async_reader = AssertAsync::new(reader);
    /// let r = async_reader.get_ref();
    /// ```
    #[inline(always)]
    pub fn get_ref(&self) -> &T {
        &self.0
    }

    /// Gets a mutable reference to the inner I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AssertAsync;
    ///
    /// let reader: &[u8] = b"hello";
    ///
    /// let mut async_reader = AssertAsync::new(reader);
    /// let r = async_reader.get_mut();
    /// ```
    #[inline(always)]
    pub fn get_mut(&mut self) -> &mut T {
        &mut self.0
    }

    /// Extracts the inner I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AssertAsync;
    ///
    /// let reader: &[u8] = b"hello";
    ///
    /// let async_reader = AssertAsync::new(reader);
    /// let inner = async_reader.into_inner();
    /// ```
    #[inline(always)]
    pub fn into_inner(self) -> T {
        self.0
    }
}

fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
where
    F: FnMut() -> std::io::Result<T>,
{
    loop {
        match f() {
            Err(err) if err.kind() == ErrorKind::Interrupted => {}
            res => return Poll::Ready(res),
        }
    }
}

impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
    #[inline]
    fn poll_read(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        assert_async_wrapio(move || self.0.read(buf))
    }

    #[inline]
    fn poll_read_vectored(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        assert_async_wrapio(move || self.0.read_vectored(bufs))
    }
}

impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
    #[inline]
    fn poll_write(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        assert_async_wrapio(move || self.0.write(buf))
    }

    #[inline]
    fn poll_write_vectored(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        bufs: &[IoSlice<'_>],
    ) -> Poll<Result<usize>> {
        assert_async_wrapio(move || self.0.write_vectored(bufs))
    }

    #[inline]
    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        assert_async_wrapio(move || self.0.flush())
    }

    #[inline]
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.poll_flush(cx)
    }
}

impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
    #[inline]
    fn poll_seek(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        pos: SeekFrom,
    ) -> Poll<Result<u64>> {
        assert_async_wrapio(move || self.0.seek(pos))
    }
}

/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
/// polls to `WouldBlock` errors.
///
/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
/// that take `Read` as a parameter.
///
/// # Examples
///
/// ```
/// use std::io::Read;
/// use std::task::{Poll, Context};
///
/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
///     // Assume we have a library that's built around `Read` and `Write` traits.
///     use cooltls::Session;
///
///     // We want to use it with our writer that implements `AsyncWrite`.
///     let writer = Stream::new();
///
///     // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
///     use futures_lite::io::AsyncAsSync;
///     let writer = AsyncAsSync::new(cx, writer);
///
///     // Now, we can use it with `cooltls`.
///     let mut session = Session::new(writer);
///
///     // Match on the result of `read()` and translate it to poll.
///     match session.read(&mut [0; 1024]) {
///         Ok(n) => Poll::Ready(n),
///         Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
///         Err(err) => panic!("unexpected error: {}", err),
///     }
/// }
///
/// // Usually, poll-based functions are best wrapped using `poll_fn`.
/// use futures_lite::future::poll_fn;
/// # futures_lite::future::block_on(async {
/// poll_fn(|cx| poll_for_io(cx)).await;
/// # });
/// # struct Stream;
/// # impl Stream {
/// #     fn new() -> Stream {
/// #         Stream
/// #     }
/// # }
/// # impl futures_lite::io::AsyncRead for Stream {
/// #     fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
/// #         Poll::Ready(Ok(0))
/// #     }
/// # }
/// # mod cooltls {
/// #     pub struct Session<W> {
/// #         reader: W,
/// #     }
/// #     impl<W> Session<W> {
/// #         pub fn new(reader: W) -> Session<W> {
/// #             Session { reader }
/// #         }
/// #     }
/// #     impl<W: std::io::Read> std::io::Read for Session<W> {
/// #         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
/// #             self.reader.read(buf)
/// #         }
/// #     }
/// # }
/// ```
#[derive(Debug)]
pub struct AsyncAsSync<'r, 'ctx, T> {
    /// The context we are using to poll the future.
    pub context: &'r mut Context<'ctx>,

    /// The actual reader/writer we are wrapping.
    pub inner: T,
}

impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
    /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AsyncAsSync;
    /// use std::task::Context;
    /// use waker_fn::waker_fn;
    ///
    /// let reader: &[u8] = b"hello";
    /// let waker = waker_fn(|| {});
    /// let mut context = Context::from_waker(&waker);
    ///
    /// let async_reader = AsyncAsSync::new(&mut context, reader);
    /// ```
    #[inline]
    pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
        AsyncAsSync { context, inner }
    }

    /// Attempt to shutdown the I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AsyncAsSync;
    /// use std::task::Context;
    /// use waker_fn::waker_fn;
    ///
    /// let reader: Vec<u8> = b"hello".to_vec();
    /// let waker = waker_fn(|| {});
    /// let mut context = Context::from_waker(&waker);
    ///
    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
    /// async_reader.close().unwrap();
    /// ```
    #[inline]
    pub fn close(&mut self) -> Result<()>
    where
        T: AsyncWrite + Unpin,
    {
        self.poll_with(|io, cx| io.poll_close(cx))
    }

    /// Poll this `AsyncAsSync` for some function.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncAsSync, AsyncRead};
    /// use std::task::Context;
    /// use waker_fn::waker_fn;
    ///
    /// let reader: &[u8] = b"hello";
    /// let waker = waker_fn(|| {});
    /// let mut context = Context::from_waker(&waker);
    ///
    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
    /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
    /// assert_eq!(r.unwrap(), 5);
    /// ```
    #[inline]
    pub fn poll_with<R>(
        &mut self,
        f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
    ) -> Result<R>
    where
        T: Unpin,
    {
        match f(Pin::new(&mut self.inner), self.context) {
            Poll::Ready(res) => res,
            Poll::Pending => Err(ErrorKind::WouldBlock.into()),
        }
    }
}

impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        self.poll_with(|io, cx| io.poll_read(cx, buf))
    }

    #[inline]
    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
        self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
    }
}

impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        self.poll_with(|io, cx| io.poll_write(cx, buf))
    }

    #[inline]
    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
        self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
    }

    #[inline]
    fn flush(&mut self) -> Result<()> {
        self.poll_with(|io, cx| io.poll_flush(cx))
    }
}

impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
        self.poll_with(|io, cx| io.poll_seek(cx, pos))
    }
}

impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn as_ref(&self) -> &T {
        &self.inner
    }
}

impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn as_mut(&mut self) -> &mut T {
        &mut self.inner
    }
}

impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn borrow(&self) -> &T {
        &self.inner
    }
}

impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
    #[inline]
    fn borrow_mut(&mut self) -> &mut T {
        &mut self.inner
    }
}

/// Blocks on all async I/O operations and implements [`std::io`] traits.
///
/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
/// manually all the time becomes too tedious, use this type for more convenient blocking on async
/// I/O operations.
///
/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
/// [`AsyncSeek`], respectively.
///
/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
/// dropping the [`BlockOn`] handle or some buffered data might get lost.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
/// use std::io::Read;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let mut blocking_reader = BlockOn::new(reader);
/// let mut contents = String::new();
///
/// // This line blocks - note that there is no await:
/// blocking_reader.read_to_string(&mut contents)?;
/// # std::io::Result::Ok(())
/// ```
#[derive(Debug)]
pub struct BlockOn<T>(T);

impl<T> BlockOn<T> {
    /// Wraps an async I/O handle into a blocking interface.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BlockOn;
    /// use futures_lite::pin;
    ///
    /// let reader: &[u8] = b"hello";
    /// pin!(reader);
    ///
    /// let blocking_reader = BlockOn::new(reader);
    /// ```
    pub fn new(io: T) -> BlockOn<T> {
        BlockOn(io)
    }

    /// Gets a reference to the async I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BlockOn;
    /// use futures_lite::pin;
    ///
    /// let reader: &[u8] = b"hello";
    /// pin!(reader);
    ///
    /// let blocking_reader = BlockOn::new(reader);
    /// let r = blocking_reader.get_ref();
    /// ```
    pub fn get_ref(&self) -> &T {
        &self.0
    }

    /// Gets a mutable reference to the async I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BlockOn;
    /// use futures_lite::pin;
    ///
    /// let reader: &[u8] = b"hello";
    /// pin!(reader);
    ///
    /// let mut blocking_reader = BlockOn::new(reader);
    /// let r = blocking_reader.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> &mut T {
        &mut self.0
    }

    /// Extracts the inner async I/O handle.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BlockOn;
    /// use futures_lite::pin;
    ///
    /// let reader: &[u8] = b"hello";
    /// pin!(reader);
    ///
    /// let blocking_reader = BlockOn::new(reader);
    /// let inner = blocking_reader.into_inner();
    /// ```
    pub fn into_inner(self) -> T {
        self.0
    }
}

impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        future::block_on(self.0.read(buf))
    }
}

impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
    fn fill_buf(&mut self) -> Result<&[u8]> {
        future::block_on(self.0.fill_buf())
    }

    fn consume(&mut self, amt: usize) {
        Pin::new(&mut self.0).consume(amt)
    }
}

impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        future::block_on(self.0.write(buf))
    }

    fn flush(&mut self) -> Result<()> {
        future::block_on(self.0.flush())
    }
}

impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
        future::block_on(self.0.seek(pos))
    }
}

pin_project! {
    /// Adds buffering to a reader.
    ///
    /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
    /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
    /// maintains an in-memory buffer of the incoming byte stream.
    ///
    /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
    /// the same file or networking socket. It does not help when reading very large amounts at
    /// once, or reading just once or a few times. It also provides no advantage when reading from
    /// a source that is already in memory, like a `Vec<u8>`.
    ///
    /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
    /// multiple instances of [`BufReader`] on the same reader can cause data loss.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::new(input);
    ///
    /// let mut line = String::new();
    /// reader.read_line(&mut line).await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    pub struct BufReader<R> {
        #[pin]
        inner: R,
        buf: Box<[u8]>,
        pos: usize,
        cap: usize,
    }
}

impl<R: AsyncRead> BufReader<R> {
    /// Creates a buffered reader with the default buffer capacity.
    ///
    /// The default capacity is currently 8 KB, but that may change in the future.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let reader = BufReader::new(input);
    /// ```
    pub fn new(inner: R) -> BufReader<R> {
        BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
    }

    /// Creates a buffered reader with the specified capacity.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let reader = BufReader::with_capacity(1024, input);
    /// ```
    pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
        BufReader {
            inner,
            buf: vec![0; capacity].into_boxed_slice(),
            pos: 0,
            cap: 0,
        }
    }
}

impl<R> BufReader<R> {
    /// Gets a reference to the underlying reader.
    ///
    /// It is not advisable to directly read from the underlying reader.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let reader = BufReader::new(input);
    ///
    /// let r = reader.get_ref();
    /// ```
    pub fn get_ref(&self) -> &R {
        &self.inner
    }

    /// Gets a mutable reference to the underlying reader.
    ///
    /// It is not advisable to directly read from the underlying reader.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::new(input);
    ///
    /// let r = reader.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> &mut R {
        &mut self.inner
    }

    /// Gets a pinned mutable reference to the underlying reader.
    ///
    /// It is not advisable to directly read from the underlying reader.
    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
        self.project().inner
    }

    /// Returns a reference to the internal buffer.
    ///
    /// This method will not attempt to fill the buffer if it is empty.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let reader = BufReader::new(input);
    ///
    /// // The internal buffer is empty until the first read request.
    /// assert_eq!(reader.buffer(), &[]);
    /// ```
    pub fn buffer(&self) -> &[u8] {
        &self.buf[self.pos..self.cap]
    }

    /// Unwraps the buffered reader, returning the underlying reader.
    ///
    /// Note that any leftover data in the internal buffer will be lost.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufReader;
    ///
    /// let input: &[u8] = b"hello";
    /// let reader = BufReader::new(input);
    ///
    /// assert_eq!(reader.into_inner(), input);
    /// ```
    pub fn into_inner(self) -> R {
        self.inner
    }

    /// Invalidates all data in the internal buffer.
    #[inline]
    fn discard_buffer(self: Pin<&mut Self>) {
        let this = self.project();
        *this.pos = 0;
        *this.cap = 0;
    }
}

impl<R: AsyncRead> AsyncRead for BufReader<R> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        // If we don't have any buffered data and we're doing a massive read
        // (larger than our internal buffer), bypass our internal buffer
        // entirely.
        if self.pos == self.cap && buf.len() >= self.buf.len() {
            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
            self.discard_buffer();
            return Poll::Ready(res);
        }
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
        let nread = std::io::Read::read(&mut rem, buf)?;
        self.consume(nread);
        Poll::Ready(Ok(nread))
    }

    fn poll_read_vectored(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
        if self.pos == self.cap && total_len >= self.buf.len() {
            let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
            self.discard_buffer();
            return Poll::Ready(res);
        }
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
        self.consume(nread);
        Poll::Ready(Ok(nread))
    }
}

impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
        let mut this = self.project();

        // If we've reached the end of our internal buffer then we need to fetch
        // some more data from the underlying reader.
        // Branch using `>=` instead of the more correct `==`
        // to tell the compiler that the pos..cap slice is always valid.
        if *this.pos >= *this.cap {
            debug_assert!(*this.pos == *this.cap);
            *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
            *this.pos = 0;
        }
        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        let this = self.project();
        *this.pos = cmp::min(*this.pos + amt, *this.cap);
    }
}

impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("BufReader")
            .field("reader", &self.inner)
            .field(
                "buffer",
                &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
            )
            .finish()
    }
}

impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
    /// Seeks to an offset, in bytes, in the underlying reader.
    ///
    /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
    /// reader would be at if the [`BufReader`] had no internal buffer.
    ///
    /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
    /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
    /// immediately after a seek yields the underlying reader at the same position.
    ///
    /// See [`AsyncSeek`] for more details.
    ///
    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
    /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
    /// the second seek returns `Err`, the underlying reader will be left at the same position it
    /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
    fn poll_seek(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        pos: SeekFrom,
    ) -> Poll<Result<u64>> {
        let result: u64;
        if let SeekFrom::Current(n) = pos {
            let remainder = (self.cap - self.pos) as i64;
            // it should be safe to assume that remainder fits within an i64 as the alternative
            // means we managed to allocate 8 exbibytes and that's absurd.
            // But it's not out of the realm of possibility for some weird underlying reader to
            // support seeking by i64::min_value() so we need to handle underflow when subtracting
            // remainder.
            if let Some(offset) = n.checked_sub(remainder) {
                result = ready!(self
                    .as_mut()
                    .get_pin_mut()
                    .poll_seek(cx, SeekFrom::Current(offset)))?;
            } else {
                // seek backwards by our remainder, and then by the offset
                ready!(self
                    .as_mut()
                    .get_pin_mut()
                    .poll_seek(cx, SeekFrom::Current(-remainder)))?;
                self.as_mut().discard_buffer();
                result = ready!(self
                    .as_mut()
                    .get_pin_mut()
                    .poll_seek(cx, SeekFrom::Current(n)))?;
            }
        } else {
            // Seeking with Start/End doesn't care about our buffer length.
            result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
        }
        self.discard_buffer();
        Poll::Ready(Ok(result))
    }
}

impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        self.as_mut().get_pin_mut().poll_write(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.as_mut().get_pin_mut().poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.as_mut().get_pin_mut().poll_close(cx)
    }
}

pin_project! {
    /// Adds buffering to a writer.
    ///
    /// It can be excessively inefficient to work directly with something that implements
    /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
    /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
    /// writes it to the underlying writer in large, infrequent batches.
    ///
    /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
    /// the same file or networking socket. It does not help when writing very large amounts at
    /// once, or writing just once or a few times. It also provides no advantage when writing to a
    /// destination that is in memory, like a `Vec<u8>`.
    ///
    /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
    /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
    /// dropping the [`BufWriter`].
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// writer.write_all(b"hello").await?;
    /// writer.flush().await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    pub struct BufWriter<W> {
        #[pin]
        inner: W,
        buf: Vec<u8>,
        written: usize,
    }
}

impl<W: AsyncWrite> BufWriter<W> {
    /// Creates a buffered writer with the default buffer capacity.
    ///
    /// The default capacity is currently 8 KB, but that may change in the future.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufWriter;
    ///
    /// let mut output = Vec::new();
    /// let writer = BufWriter::new(&mut output);
    /// ```
    pub fn new(inner: W) -> BufWriter<W> {
        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
    }

    /// Creates a buffered writer with the specified buffer capacity.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufWriter;
    ///
    /// let mut output = Vec::new();
    /// let writer = BufWriter::with_capacity(100, &mut output);
    /// ```
    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
        BufWriter {
            inner,
            buf: Vec::with_capacity(capacity),
            written: 0,
        }
    }

    /// Gets a reference to the underlying writer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufWriter;
    ///
    /// let mut output = Vec::new();
    /// let writer = BufWriter::new(&mut output);
    ///
    /// let r = writer.get_ref();
    /// ```
    pub fn get_ref(&self) -> &W {
        &self.inner
    }

    /// Gets a mutable reference to the underlying writer.
    ///
    /// It is not advisable to directly write to the underlying writer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufWriter;
    ///
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// let r = writer.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> &mut W {
        &mut self.inner
    }

    /// Gets a pinned mutable reference to the underlying writer.
    ///
    /// It is not not advisable to directly write to the underlying writer.
    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
        self.project().inner
    }

    /// Unwraps the buffered writer, returning the underlying writer.
    ///
    /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
    /// that data, flush the buffered writer before unwrapping it.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = vec![1, 2, 3];
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// writer.write_all(&[4]).await?;
    /// writer.flush().await?;
    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
    /// # std::io::Result::Ok(()) });
    /// ```
    pub fn into_inner(self) -> W {
        self.inner
    }

    /// Returns a reference to the internal buffer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::BufWriter;
    ///
    /// let mut output = Vec::new();
    /// let writer = BufWriter::new(&mut output);
    ///
    /// // The internal buffer is empty until the first write request.
    /// assert_eq!(writer.buffer(), &[]);
    /// ```
    pub fn buffer(&self) -> &[u8] {
        &self.buf
    }

    /// Flush the buffer.
    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        let mut this = self.project();
        let len = this.buf.len();
        let mut ret = Ok(());

        while *this.written < len {
            match this
                .inner
                .as_mut()
                .poll_write(cx, &this.buf[*this.written..])
            {
                Poll::Ready(Ok(0)) => {
                    ret = Err(Error::new(
                        ErrorKind::WriteZero,
                        "Failed to write buffered data",
                    ));
                    break;
                }
                Poll::Ready(Ok(n)) => *this.written += n,
                Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
                Poll::Ready(Err(e)) => {
                    ret = Err(e);
                    break;
                }
                Poll::Pending => return Poll::Pending,
            }
        }

        if *this.written > 0 {
            this.buf.drain(..*this.written);
        }
        *this.written = 0;

        Poll::Ready(ret)
    }
}

impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("BufWriter")
            .field("writer", &self.inner)
            .field("buf", &self.buf)
            .finish()
    }
}

impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        if self.buf.len() + buf.len() > self.buf.capacity() {
            ready!(self.as_mut().poll_flush_buf(cx))?;
        }
        if buf.len() >= self.buf.capacity() {
            self.get_pin_mut().poll_write(cx, buf)
        } else {
            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        ready!(self.as_mut().poll_flush_buf(cx))?;
        self.get_pin_mut().poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        ready!(self.as_mut().poll_flush_buf(cx))?;
        self.get_pin_mut().poll_close(cx)
    }
}

impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
    /// Seek to the offset, in bytes, in the underlying writer.
    ///
    /// Seeking always writes out the internal buffer before seeking.
    fn poll_seek(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        pos: SeekFrom,
    ) -> Poll<Result<u64>> {
        ready!(self.as_mut().poll_flush_buf(cx))?;
        self.get_pin_mut().poll_seek(cx, pos)
    }
}

/// Gives an in-memory buffer a cursor for reading and writing.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
///
/// # spin_on::spin_on(async {
/// let mut bytes = b"hello".to_vec();
/// let mut cursor = Cursor::new(&mut bytes);
///
/// // Overwrite 'h' with 'H'.
/// cursor.write_all(b"H").await?;
///
/// // Move the cursor one byte forward.
/// cursor.seek(SeekFrom::Current(1)).await?;
///
/// // Read a byte.
/// let mut byte = [0];
/// cursor.read_exact(&mut byte).await?;
/// assert_eq!(&byte, b"l");
///
/// // Check the final buffer.
/// assert_eq!(bytes, b"Hello");
/// # std::io::Result::Ok(()) });
/// ```
#[derive(Clone, Debug, Default)]
pub struct Cursor<T> {
    inner: std::io::Cursor<T>,
}

impl<T> Cursor<T> {
    /// Creates a cursor for an in-memory buffer.
    ///
    /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
    /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
    /// the buffer using [`set_position()`][Cursor::set_position()`] or
    /// [`seek()`][`AsyncSeekExt::seek()`].
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::Cursor;
    ///
    /// let cursor = Cursor::new(Vec::<u8>::new());
    /// ```
    pub fn new(inner: T) -> Cursor<T> {
        Cursor {
            inner: std::io::Cursor::new(inner),
        }
    }

    /// Gets a reference to the underlying buffer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::Cursor;
    ///
    /// let cursor = Cursor::new(Vec::<u8>::new());
    /// let r = cursor.get_ref();
    /// ```
    pub fn get_ref(&self) -> &T {
        self.inner.get_ref()
    }

    /// Gets a mutable reference to the underlying buffer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::Cursor;
    ///
    /// let mut cursor = Cursor::new(Vec::<u8>::new());
    /// let r = cursor.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> &mut T {
        self.inner.get_mut()
    }

    /// Unwraps the cursor, returning the underlying buffer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::Cursor;
    ///
    /// let cursor = Cursor::new(vec![1, 2, 3]);
    /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
    /// ```
    pub fn into_inner(self) -> T {
        self.inner.into_inner()
    }

    /// Returns the current position of this cursor.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
    ///
    /// # spin_on::spin_on(async {
    /// let mut cursor = Cursor::new(b"hello");
    /// assert_eq!(cursor.position(), 0);
    ///
    /// cursor.seek(SeekFrom::Start(2)).await?;
    /// assert_eq!(cursor.position(), 2);
    /// # std::io::Result::Ok(()) });
    /// ```
    pub fn position(&self) -> u64 {
        self.inner.position()
    }

    /// Sets the position of this cursor.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::Cursor;
    ///
    /// let mut cursor = Cursor::new(b"hello");
    /// assert_eq!(cursor.position(), 0);
    ///
    /// cursor.set_position(2);
    /// assert_eq!(cursor.position(), 2);
    /// ```
    pub fn set_position(&mut self, pos: u64) {
        self.inner.set_position(pos)
    }
}

impl<T> AsyncSeek for Cursor<T>
where
    T: AsRef<[u8]> + Unpin,
{
    fn poll_seek(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        pos: SeekFrom,
    ) -> Poll<Result<u64>> {
        Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
    }
}

impl<T> AsyncRead for Cursor<T>
where
    T: AsRef<[u8]> + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Read::read(&mut self.inner, buf))
    }

    fn poll_read_vectored(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
    }
}

impl<T> AsyncBufRead for Cursor<T>
where
    T: AsRef<[u8]> + Unpin,
{
    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
        Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
    }

    fn consume(mut self: Pin<&mut Self>, amt: usize) {
        std::io::BufRead::consume(&mut self.inner, amt)
    }
}

impl AsyncWrite for Cursor<&mut [u8]> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
    }

    fn poll_write_vectored(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        bufs: &[IoSlice<'_>],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
    }

    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        Poll::Ready(std::io::Write::flush(&mut self.inner))
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.poll_flush(cx)
    }
}

impl AsyncWrite for Cursor<&mut Vec<u8>> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.poll_flush(cx)
    }

    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        Poll::Ready(std::io::Write::flush(&mut self.inner))
    }
}

impl AsyncWrite for Cursor<Vec<u8>> {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.poll_flush(cx)
    }

    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        Poll::Ready(std::io::Write::flush(&mut self.inner))
    }
}

/// Creates an empty reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncReadExt};
///
/// # spin_on::spin_on(async {
/// let mut reader = io::empty();
///
/// let mut contents = Vec::new();
/// reader.read_to_end(&mut contents).await?;
/// assert!(contents.is_empty());
/// # std::io::Result::Ok(()) });
/// ```
pub fn empty() -> Empty {
    Empty { _private: () }
}

/// Reader for the [`empty()`] function.
pub struct Empty {
    _private: (),
}

impl fmt::Debug for Empty {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Empty { .. }")
    }
}

impl AsyncRead for Empty {
    #[inline]
    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
        Poll::Ready(Ok(0))
    }
}

impl AsyncBufRead for Empty {
    #[inline]
    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
        Poll::Ready(Ok(&[]))
    }

    #[inline]
    fn consume(self: Pin<&mut Self>, _: usize) {}
}

/// Creates an infinite reader that reads the same byte repeatedly.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncReadExt};
///
/// # spin_on::spin_on(async {
/// let mut reader = io::repeat(b'a');
///
/// let mut contents = vec![0; 5];
/// reader.read_exact(&mut contents).await?;
/// assert_eq!(contents, b"aaaaa");
/// # std::io::Result::Ok(()) });
/// ```
pub fn repeat(byte: u8) -> Repeat {
    Repeat { byte }
}

/// Reader for the [`repeat()`] function.
#[derive(Debug)]
pub struct Repeat {
    byte: u8,
}

impl AsyncRead for Repeat {
    #[inline]
    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        for b in &mut *buf {
            *b = self.byte;
        }
        Poll::Ready(Ok(buf.len()))
    }
}

/// Creates a writer that consumes and drops all data.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncWriteExt};
///
/// # spin_on::spin_on(async {
/// let mut writer = io::sink();
/// writer.write_all(b"hello").await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn sink() -> Sink {
    Sink { _private: () }
}

/// Writer for the [`sink()`] function.
#[derive(Debug)]
pub struct Sink {
    _private: (),
}

impl AsyncWrite for Sink {
    #[inline]
    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        Poll::Ready(Ok(buf.len()))
    }

    #[inline]
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }

    #[inline]
    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }
}

/// Extension trait for [`AsyncBufRead`].
pub trait AsyncBufReadExt: AsyncBufRead {
    /// Returns the contents of the internal buffer, filling it with more data if empty.
    ///
    /// If the stream has reached EOF, an empty buffer will be returned.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    /// use std::pin::Pin;
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello world";
    /// let mut reader = BufReader::with_capacity(5, input);
    ///
    /// assert_eq!(reader.fill_buf().await?, b"hello");
    /// reader.consume(2);
    /// assert_eq!(reader.fill_buf().await?, b"llo");
    /// reader.consume(3);
    /// assert_eq!(reader.fill_buf().await?, b" worl");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
    where
        Self: Unpin,
    {
        FillBuf { reader: Some(self) }
    }

    /// Consumes `amt` buffered bytes.
    ///
    /// This method does not perform any I/O, it simply consumes some amount of bytes from the
    /// internal buffer.
    ///
    /// The `amt` must be <= the number of bytes in the buffer returned by
    /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    /// use std::pin::Pin;
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::with_capacity(4, input);
    ///
    /// assert_eq!(reader.fill_buf().await?, b"hell");
    /// reader.consume(2);
    /// assert_eq!(reader.fill_buf().await?, b"ll");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn consume(&mut self, amt: usize)
    where
        Self: Unpin,
    {
        AsyncBufRead::consume(Pin::new(self), amt);
    }

    /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
    ///
    /// This method will read bytes from the underlying stream until the delimiter or EOF is
    /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
    ///
    /// If successful, returns the total number of bytes read.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::new(input);
    ///
    /// let mut buf = Vec::new();
    /// let n = reader.read_until(b'\n', &mut buf).await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
    where
        Self: Unpin,
    {
        ReadUntilFuture {
            reader: self,
            byte,
            buf,
            read: 0,
        }
    }

    /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
    ///
    /// This method will read bytes from the underlying stream until the newline delimiter (the
    /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
    /// will be appended to `buf`.
    ///
    /// If successful, returns the total number of bytes read.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::new(input);
    ///
    /// let mut line = String::new();
    /// let n = reader.read_line(&mut line).await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
    where
        Self: Unpin,
    {
        ReadLineFuture {
            reader: self,
            buf,
            bytes: Vec::new(),
            read: 0,
        }
    }

    /// Returns a stream over the lines of this byte stream.
    ///
    /// The stream returned from this method yields items of type
    /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
    /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
    /// at the end.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
    /// use futures_lite::stream::StreamExt;
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello\nworld\n";
    /// let mut reader = BufReader::new(input);
    /// let mut lines = reader.lines();
    ///
    /// while let Some(line) = lines.next().await {
    ///     println!("{}", line?);
    /// }
    /// # std::io::Result::Ok(()) });
    /// ```
    fn lines(self) -> Lines<Self>
    where
        Self: Unpin + Sized,
    {
        Lines {
            reader: self,
            buf: String::new(),
            bytes: Vec::new(),
            read: 0,
        }
    }

    /// Returns a stream over the contents of this reader split on the specified `byte`.
    ///
    /// The stream returned from this method yields items of type
    /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
    /// Each vector returned will *not* have the delimiter byte at the end.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncBufReadExt, Cursor};
    /// use futures_lite::stream::StreamExt;
    ///
    /// # spin_on::spin_on(async {
    /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
    /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
    ///
    /// assert_eq!(items[0], b"lorem");
    /// assert_eq!(items[1], b"ipsum");
    /// assert_eq!(items[2], b"dolor");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn split(self, byte: u8) -> Split<Self>
    where
        Self: Sized,
    {
        Split {
            reader: self,
            buf: Vec::new(),
            delim: byte,
            read: 0,
        }
    }
}

impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FillBuf<'a, R: ?Sized> {
    reader: Option<&'a mut R>,
}

impl<R: ?Sized> Unpin for FillBuf<'_, R> {}

impl<'a, R> Future for FillBuf<'a, R>
where
    R: AsyncBufRead + Unpin + ?Sized,
{
    type Output = Result<&'a [u8]>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = &mut *self;
        let reader = this
            .reader
            .take()
            .expect("polled `FillBuf` after completion");

        match Pin::new(&mut *reader).poll_fill_buf(cx) {
            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
                poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
            },
            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
            Poll::Pending => {
                this.reader = Some(reader);
                Poll::Pending
            }
        }
    }
}

/// Future for the [`AsyncBufReadExt::read_until()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    byte: u8,
    buf: &'a mut Vec<u8>,
    read: usize,
}

impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}

impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self {
            reader,
            byte,
            buf,
            read,
        } = &mut *self;
        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
    }
}

fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
    mut reader: Pin<&mut R>,
    cx: &mut Context<'_>,
    byte: u8,
    buf: &mut Vec<u8>,
    read: &mut usize,
) -> Poll<Result<usize>> {
    loop {
        let (done, used) = {
            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;

            if let Some(i) = memchr(byte, available) {
                buf.extend_from_slice(&available[..=i]);
                (true, i + 1)
            } else {
                buf.extend_from_slice(available);
                (false, available.len())
            }
        };

        reader.as_mut().consume(used);
        *read += used;

        if done || used == 0 {
            return Poll::Ready(Ok(mem::replace(read, 0)));
        }
    }
}

/// Future for the [`AsyncBufReadExt::read_line()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    buf: &'a mut String,
    bytes: Vec<u8>,
    read: usize,
}

impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}

impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self {
            reader,
            buf,
            bytes,
            read,
        } = &mut *self;
        read_line_internal(Pin::new(reader), cx, buf, bytes, read)
    }
}

pin_project! {
    /// Stream for the [`AsyncBufReadExt::lines()`] method.
    #[derive(Debug)]
    #[must_use = "streams do nothing unless polled"]
    pub struct Lines<R> {
        #[pin]
        reader: R,
        buf: String,
        bytes: Vec<u8>,
        read: usize,
    }
}

impl<R: AsyncBufRead> Stream for Lines<R> {
    type Item = Result<String>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        let n = ready!(read_line_internal(
            this.reader,
            cx,
            this.buf,
            this.bytes,
            this.read
        ))?;
        if n == 0 && this.buf.is_empty() {
            return Poll::Ready(None);
        }

        if this.buf.ends_with('\n') {
            this.buf.pop();
            if this.buf.ends_with('\r') {
                this.buf.pop();
            }
        }
        Poll::Ready(Some(Ok(mem::take(this.buf))))
    }
}

fn read_line_internal<R: AsyncBufRead + ?Sized>(
    reader: Pin<&mut R>,
    cx: &mut Context<'_>,
    buf: &mut String,
    bytes: &mut Vec<u8>,
    read: &mut usize,
) -> Poll<Result<usize>> {
    let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));

    match String::from_utf8(mem::take(bytes)) {
        Ok(s) => {
            debug_assert!(buf.is_empty());
            debug_assert_eq!(*read, 0);
            *buf = s;
            Poll::Ready(ret)
        }
        Err(_) => Poll::Ready(ret.and_then(|_| {
            Err(Error::new(
                ErrorKind::InvalidData,
                "stream did not contain valid UTF-8",
            ))
        })),
    }
}

pin_project! {
    /// Stream for the [`AsyncBufReadExt::split()`] method.
    #[derive(Debug)]
    #[must_use = "streams do nothing unless polled"]
    pub struct Split<R> {
        #[pin]
        reader: R,
        buf: Vec<u8>,
        read: usize,
        delim: u8,
    }
}

impl<R: AsyncBufRead> Stream for Split<R> {
    type Item = Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        let n = ready!(read_until_internal(
            this.reader,
            cx,
            *this.delim,
            this.buf,
            this.read
        ))?;
        if n == 0 && this.buf.is_empty() {
            return Poll::Ready(None);
        }

        if this.buf[this.buf.len() - 1] == *this.delim {
            this.buf.pop();
        }
        Poll::Ready(Some(Ok(mem::take(this.buf))))
    }
}

/// Extension trait for [`AsyncRead`].
pub trait AsyncReadExt: AsyncRead {
    /// Reads some bytes from the byte stream.
    ///
    /// On success, returns the total number of bytes read.
    ///
    /// If the return value is `Ok(n)`, then it must be guaranteed that
    /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
    /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
    /// scenarios:
    ///
    /// 1. This reader has reached its "end of file" and will likely no longer be able to
    ///    produce bytes. Note that this does not mean that the reader will always no
    ///    longer be able to produce bytes.
    /// 2. The buffer specified was 0 bytes in length.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, BufReader};
    ///
    /// # spin_on::spin_on(async {
    /// let input: &[u8] = b"hello";
    /// let mut reader = BufReader::new(input);
    ///
    /// let mut buf = vec![0; 1024];
    /// let n = reader.read(&mut buf).await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
    where
        Self: Unpin,
    {
        ReadFuture { reader: self, buf }
    }

    /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
    ///
    /// Data is copied to fill each buffer in order, with the final buffer possibly being
    /// only partially filled. This method must behave same as a single call to
    /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
    fn read_vectored<'a>(
        &'a mut self,
        bufs: &'a mut [IoSliceMut<'a>],
    ) -> ReadVectoredFuture<'a, Self>
    where
        Self: Unpin,
    {
        ReadVectoredFuture { reader: self, bufs }
    }

    /// Reads the entire contents and appends them to a [`Vec`].
    ///
    /// On success, returns the total number of bytes read.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// # spin_on::spin_on(async {
    /// let mut reader = Cursor::new(vec![1, 2, 3]);
    /// let mut contents = Vec::new();
    ///
    /// let n = reader.read_to_end(&mut contents).await?;
    /// assert_eq!(n, 3);
    /// assert_eq!(contents, [1, 2, 3]);
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
    where
        Self: Unpin,
    {
        let start_len = buf.len();
        ReadToEndFuture {
            reader: self,
            buf,
            start_len,
        }
    }

    /// Reads the entire contents and appends them to a [`String`].
    ///
    /// On success, returns the total number of bytes read.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// # spin_on::spin_on(async {
    /// let mut reader = Cursor::new(&b"hello");
    /// let mut contents = String::new();
    ///
    /// let n = reader.read_to_string(&mut contents).await?;
    /// assert_eq!(n, 5);
    /// assert_eq!(contents, "hello");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
    where
        Self: Unpin,
    {
        ReadToStringFuture {
            reader: self,
            buf,
            bytes: Vec::new(),
            start_len: 0,
        }
    }

    /// Reads the exact number of bytes required to fill `buf`.
    ///
    /// On success, returns the total number of bytes read.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// # spin_on::spin_on(async {
    /// let mut reader = Cursor::new(&b"hello");
    /// let mut contents = vec![0; 3];
    ///
    /// reader.read_exact(&mut contents).await?;
    /// assert_eq!(contents, b"hel");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
    where
        Self: Unpin,
    {
        ReadExactFuture { reader: self, buf }
    }

    /// Creates an adapter which will read at most `limit` bytes from it.
    ///
    /// This method returns a new instance of [`AsyncRead`] which will read at most
    /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// # spin_on::spin_on(async {
    /// let mut reader = Cursor::new(&b"hello");
    /// let mut contents = String::new();
    ///
    /// let n = reader.take(3).read_to_string(&mut contents).await?;
    /// assert_eq!(n, 3);
    /// assert_eq!(contents, "hel");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn take(self, limit: u64) -> Take<Self>
    where
        Self: Sized,
    {
        Take { inner: self, limit }
    }

    /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
    ///
    /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    /// use futures_lite::stream::StreamExt;
    ///
    /// # spin_on::spin_on(async {
    /// let reader = Cursor::new(&b"hello");
    /// let mut bytes = reader.bytes();
    ///
    /// while let Some(byte) = bytes.next().await {
    ///     println!("byte: {}", byte?);
    /// }
    /// # std::io::Result::Ok(()) });
    /// ```
    fn bytes(self) -> Bytes<Self>
    where
        Self: Sized,
    {
        Bytes { inner: self }
    }

    /// Creates an adapter which will chain this stream with another.
    ///
    /// The returned [`AsyncRead`] instance will first read all bytes from this reader
    /// until EOF is found, and then continue with `next`.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// # spin_on::spin_on(async {
    /// let r1 = Cursor::new(&b"hello");
    /// let r2 = Cursor::new(&b"world");
    /// let mut reader = r1.chain(r2);
    ///
    /// let mut contents = String::new();
    /// reader.read_to_string(&mut contents).await?;
    /// assert_eq!(contents, "helloworld");
    /// # std::io::Result::Ok(()) });
    /// ```
    fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
    where
        Self: Sized,
    {
        Chain {
            first: self,
            second: next,
            done_first: false,
        }
    }

    /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AsyncReadExt;
    ///
    /// let reader = [1, 2, 3].boxed_reader();
    /// ```
    #[cfg(feature = "alloc")]
    fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
    where
        Self: Sized + Send + 'a,
    {
        Box::pin(self)
    }
}

impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}

/// Future for the [`AsyncReadExt::read()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    buf: &'a mut [u8],
}

impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}

impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self { reader, buf } = &mut *self;
        Pin::new(reader).poll_read(cx, buf)
    }
}

/// Future for the [`AsyncReadExt::read_vectored()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    bufs: &'a mut [IoSliceMut<'a>],
}

impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}

impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self { reader, bufs } = &mut *self;
        Pin::new(reader).poll_read_vectored(cx, bufs)
    }
}

/// Future for the [`AsyncReadExt::read_to_end()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    buf: &'a mut Vec<u8>,
    start_len: usize,
}

impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}

impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self {
            reader,
            buf,
            start_len,
        } = &mut *self;
        read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
    }
}

/// Future for the [`AsyncReadExt::read_to_string()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    buf: &'a mut String,
    bytes: Vec<u8>,
    start_len: usize,
}

impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}

impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self {
            reader,
            buf,
            bytes,
            start_len,
        } = &mut *self;
        let reader = Pin::new(reader);

        let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));

        match String::from_utf8(mem::take(bytes)) {
            Ok(s) => {
                debug_assert!(buf.is_empty());
                **buf = s;
                Poll::Ready(ret)
            }
            Err(_) => Poll::Ready(ret.and_then(|_| {
                Err(Error::new(
                    ErrorKind::InvalidData,
                    "stream did not contain valid UTF-8",
                ))
            })),
        }
    }
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + ?Sized>(
    mut rd: Pin<&mut R>,
    cx: &mut Context<'_>,
    buf: &mut Vec<u8>,
    start_len: usize,
) -> Poll<Result<usize>> {
    struct Guard<'a> {
        buf: &'a mut Vec<u8>,
        len: usize,
    }

    impl Drop for Guard<'_> {
        fn drop(&mut self) {
            self.buf.resize(self.len, 0);
        }
    }

    let mut g = Guard {
        len: buf.len(),
        buf,
    };
    let ret;
    loop {
        if g.len == g.buf.len() {
            g.buf.reserve(32);
            let capacity = g.buf.capacity();
            g.buf.resize(capacity, 0);
        }

        match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
            Ok(0) => {
                ret = Poll::Ready(Ok(g.len - start_len));
                break;
            }
            Ok(n) => g.len += n,
            Err(e) => {
                ret = Poll::Ready(Err(e));
                break;
            }
        }
    }

    ret
}

/// Future for the [`AsyncReadExt::read_exact()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
    reader: &'a mut R,
    buf: &'a mut [u8],
}

impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}

impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self { reader, buf } = &mut *self;

        while !buf.is_empty() {
            let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
            let (_, rest) = mem::take(buf).split_at_mut(n);
            *buf = rest;

            if n == 0 {
                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
            }
        }

        Poll::Ready(Ok(()))
    }
}

pin_project! {
    /// Reader for the [`AsyncReadExt::take()`] method.
    #[derive(Debug)]
    pub struct Take<R> {
        #[pin]
        inner: R,
        limit: u64,
    }
}

impl<R> Take<R> {
    /// Returns the number of bytes before this adapter will return EOF.
    ///
    /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let reader = Cursor::new("hello");
    ///
    /// let reader = reader.take(3);
    /// assert_eq!(reader.limit(), 3);
    /// ```
    pub fn limit(&self) -> u64 {
        self.limit
    }

    /// Puts a limit on the number of bytes.
    ///
    /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let reader = Cursor::new("hello");
    ///
    /// let mut reader = reader.take(10);
    /// assert_eq!(reader.limit(), 10);
    ///
    /// reader.set_limit(3);
    /// assert_eq!(reader.limit(), 3);
    /// ```
    pub fn set_limit(&mut self, limit: u64) {
        self.limit = limit;
    }

    /// Gets a reference to the underlying reader.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let reader = Cursor::new("hello");
    ///
    /// let reader = reader.take(3);
    /// let r = reader.get_ref();
    /// ```
    pub fn get_ref(&self) -> &R {
        &self.inner
    }

    /// Gets a mutable reference to the underlying reader.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let reader = Cursor::new("hello");
    ///
    /// let mut reader = reader.take(3);
    /// let r = reader.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> &mut R {
        &mut self.inner
    }

    /// Unwraps the adapter, returning the underlying reader.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let reader = Cursor::new("hello");
    ///
    /// let reader = reader.take(3);
    /// let reader = reader.into_inner();
    /// ```
    pub fn into_inner(self) -> R {
        self.inner
    }
}

impl<R: AsyncRead> AsyncRead for Take<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        let this = self.project();
        take_read_internal(this.inner, cx, buf, this.limit)
    }
}

fn take_read_internal<R: AsyncRead + ?Sized>(
    mut rd: Pin<&mut R>,
    cx: &mut Context<'_>,
    buf: &mut [u8],
    limit: &mut u64,
) -> Poll<Result<usize>> {
    // Don't call into inner reader at all at EOF because it may still block
    if *limit == 0 {
        return Poll::Ready(Ok(0));
    }

    let max = cmp::min(buf.len() as u64, *limit) as usize;

    match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
        Ok(n) => {
            *limit -= n as u64;
            Poll::Ready(Ok(n))
        }
        Err(e) => Poll::Ready(Err(e)),
    }
}

impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
        let this = self.project();

        if *this.limit == 0 {
            return Poll::Ready(Ok(&[]));
        }

        match ready!(this.inner.poll_fill_buf(cx)) {
            Ok(buf) => {
                let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
                Poll::Ready(Ok(&buf[..cap]))
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        let this = self.project();
        // Don't let callers reset the limit by passing an overlarge value
        let amt = cmp::min(amt as u64, *this.limit) as usize;
        *this.limit -= amt as u64;

        this.inner.consume(amt);
    }
}

pin_project! {
    /// Reader for the [`AsyncReadExt::bytes()`] method.
    #[derive(Debug)]
    pub struct Bytes<R> {
        #[pin]
        inner: R,
    }
}

impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
    type Item = Result<u8>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut byte = 0;

        let rd = Pin::new(&mut self.inner);

        match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
            Ok(0) => Poll::Ready(None),
            Ok(..) => Poll::Ready(Some(Ok(byte))),
            Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
            Err(e) => Poll::Ready(Some(Err(e))),
        }
    }
}

impl<R: AsyncRead> AsyncRead for Bytes<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        self.project().inner.poll_read(cx, buf)
    }

    fn poll_read_vectored(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        self.project().inner.poll_read_vectored(cx, bufs)
    }
}

pin_project! {
    /// Reader for the [`AsyncReadExt::chain()`] method.
    pub struct Chain<R1, R2> {
        #[pin]
        first: R1,
        #[pin]
        second: R2,
        done_first: bool,
    }
}

impl<R1, R2> Chain<R1, R2> {
    /// Gets references to the underlying readers.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let r1 = Cursor::new(b"hello");
    /// let r2 = Cursor::new(b"world");
    ///
    /// let reader = r1.chain(r2);
    /// let (r1, r2) = reader.get_ref();
    /// ```
    pub fn get_ref(&self) -> (&R1, &R2) {
        (&self.first, &self.second)
    }

    /// Gets mutable references to the underlying readers.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let r1 = Cursor::new(b"hello");
    /// let r2 = Cursor::new(b"world");
    ///
    /// let mut reader = r1.chain(r2);
    /// let (r1, r2) = reader.get_mut();
    /// ```
    pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
        (&mut self.first, &mut self.second)
    }

    /// Unwraps the adapter, returning the underlying readers.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncReadExt, Cursor};
    ///
    /// let r1 = Cursor::new(b"hello");
    /// let r2 = Cursor::new(b"world");
    ///
    /// let reader = r1.chain(r2);
    /// let (r1, r2) = reader.into_inner();
    /// ```
    pub fn into_inner(self) -> (R1, R2) {
        (self.first, self.second)
    }
}

impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Chain")
            .field("r1", &self.first)
            .field("r2", &self.second)
            .finish()
    }
}

impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        let this = self.project();
        if !*this.done_first {
            match ready!(this.first.poll_read(cx, buf)) {
                Ok(0) if !buf.is_empty() => *this.done_first = true,
                Ok(n) => return Poll::Ready(Ok(n)),
                Err(err) => return Poll::Ready(Err(err)),
            }
        }

        this.second.poll_read(cx, buf)
    }

    fn poll_read_vectored(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        let this = self.project();
        if !*this.done_first {
            match ready!(this.first.poll_read_vectored(cx, bufs)) {
                Ok(0) if !bufs.is_empty() => *this.done_first = true,
                Ok(n) => return Poll::Ready(Ok(n)),
                Err(err) => return Poll::Ready(Err(err)),
            }
        }

        this.second.poll_read_vectored(cx, bufs)
    }
}

impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
        let this = self.project();
        if !*this.done_first {
            match ready!(this.first.poll_fill_buf(cx)) {
                Ok([]) => *this.done_first = true,
                Ok(buf) => return Poll::Ready(Ok(buf)),
                Err(err) => return Poll::Ready(Err(err)),
            }
        }

        this.second.poll_fill_buf(cx)
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        let this = self.project();
        if !*this.done_first {
            this.first.consume(amt)
        } else {
            this.second.consume(amt)
        }
    }
}

/// Extension trait for [`AsyncSeek`].
pub trait AsyncSeekExt: AsyncSeek {
    /// Seeks to a new position in a byte stream.
    ///
    /// Returns the new position in the byte stream.
    ///
    /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
    ///
    /// # spin_on::spin_on(async {
    /// let mut cursor = Cursor::new("hello");
    ///
    /// // Move the cursor to the end.
    /// cursor.seek(SeekFrom::End(0)).await?;
    ///
    /// // Check the current position.
    /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
    /// # std::io::Result::Ok(()) });
    /// ```
    fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
    where
        Self: Unpin,
    {
        SeekFuture { seeker: self, pos }
    }
}

impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

/// Future for the [`AsyncSeekExt::seek()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SeekFuture<'a, S: Unpin + ?Sized> {
    seeker: &'a mut S,
    pos: SeekFrom,
}

impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}

impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
    type Output = Result<u64>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let pos = self.pos;
        Pin::new(&mut *self.seeker).poll_seek(cx, pos)
    }
}

/// Extension trait for [`AsyncWrite`].
pub trait AsyncWriteExt: AsyncWrite {
    /// Writes some bytes into the byte stream.
    ///
    /// Returns the number of bytes written from the start of the buffer.
    ///
    /// If the return value is `Ok(n)` then it must be guaranteed that
    /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
    /// object is no longer able to accept bytes and will likely not be able to in the
    /// future as well, or that the provided buffer is empty.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// let n = writer.write(b"hello").await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
    where
        Self: Unpin,
    {
        WriteFuture { writer: self, buf }
    }

    /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
    ///
    /// Data is copied from each buffer in order, with the final buffer possibly being only
    /// partially consumed. This method must behave same as a call to
    /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
    where
        Self: Unpin,
    {
        WriteVectoredFuture { writer: self, bufs }
    }

    /// Writes an entire buffer into the byte stream.
    ///
    /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
    /// data to be written or an error occurs. It will not return before the entire buffer is
    /// successfully written or an error occurs.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// let n = writer.write_all(b"hello").await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
    where
        Self: Unpin,
    {
        WriteAllFuture { writer: self, buf }
    }

    /// Flushes the stream to ensure that all buffered contents reach their destination.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// writer.write_all(b"hello").await?;
    /// writer.flush().await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn flush(&mut self) -> FlushFuture<'_, Self>
    where
        Self: Unpin,
    {
        FlushFuture { writer: self }
    }

    /// Closes the writer.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
    ///
    /// # spin_on::spin_on(async {
    /// let mut output = Vec::new();
    /// let mut writer = BufWriter::new(&mut output);
    ///
    /// writer.close().await?;
    /// # std::io::Result::Ok(()) });
    /// ```
    fn close(&mut self) -> CloseFuture<'_, Self>
    where
        Self: Unpin,
    {
        CloseFuture { writer: self }
    }

    /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures_lite::io::AsyncWriteExt;
    ///
    /// let writer = Vec::<u8>::new().boxed_writer();
    /// ```
    #[cfg(feature = "alloc")]
    fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
    where
        Self: Sized + Send + 'a,
    {
        Box::pin(self)
    }
}

impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}

/// Future for the [`AsyncWriteExt::write()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteFuture<'a, W: Unpin + ?Sized> {
    writer: &'a mut W,
    buf: &'a [u8],
}

impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}

impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let buf = self.buf;
        Pin::new(&mut *self.writer).poll_write(cx, buf)
    }
}

/// Future for the [`AsyncWriteExt::write_vectored()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
    writer: &'a mut W,
    bufs: &'a [IoSlice<'a>],
}

impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}

impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
    type Output = Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let bufs = self.bufs;
        Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
    }
}

/// Future for the [`AsyncWriteExt::write_all()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
    writer: &'a mut W,
    buf: &'a [u8],
}

impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}

impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self { writer, buf } = &mut *self;

        while !buf.is_empty() {
            let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
            let (_, rest) = mem::take(buf).split_at(n);
            *buf = rest;

            if n == 0 {
                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
            }
        }

        Poll::Ready(Ok(()))
    }
}

/// Future for the [`AsyncWriteExt::flush()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FlushFuture<'a, W: Unpin + ?Sized> {
    writer: &'a mut W,
}

impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}

impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut *self.writer).poll_flush(cx)
    }
}

/// Future for the [`AsyncWriteExt::close()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CloseFuture<'a, W: Unpin + ?Sized> {
    writer: &'a mut W,
}

impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}

impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut *self.writer).poll_close(cx)
    }
}

/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncReadExt;
///
/// let reader = [1, 2, 3].boxed_reader();
/// ```
#[cfg(feature = "alloc")]
pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;

/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncWriteExt;
///
/// let writer = Vec::<u8>::new().boxed_writer();
/// ```
#[cfg(feature = "alloc")]
pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;

/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, Cursor};
///
/// # spin_on::spin_on(async {
/// let stream = Cursor::new(vec![]);
/// let (mut reader, mut writer) = io::split(stream);
/// # std::io::Result::Ok(()) });
/// ```
pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
    T: AsyncRead + AsyncWrite + Unpin,
{
    let inner = Arc::new(Mutex::new(stream));
    (ReadHalf(inner.clone()), WriteHalf(inner))
}

/// The read half returned by [`split()`].
#[derive(Debug)]
pub struct ReadHalf<T>(Arc<Mutex<T>>);

/// The write half returned by [`split()`].
#[derive(Debug)]
pub struct WriteHalf<T>(Arc<Mutex<T>>);

impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        let mut inner = self.0.lock().unwrap();
        Pin::new(&mut *inner).poll_read(cx, buf)
    }

    fn poll_read_vectored(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        bufs: &mut [IoSliceMut<'_>],
    ) -> Poll<Result<usize>> {
        let mut inner = self.0.lock().unwrap();
        Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        let mut inner = self.0.lock().unwrap();
        Pin::new(&mut *inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        let mut inner = self.0.lock().unwrap();
        Pin::new(&mut *inner).poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        let mut inner = self.0.lock().unwrap();
        Pin::new(&mut *inner).poll_close(cx)
    }
}

#[cfg(feature = "memchr")]
use memchr::memchr;

/// Unoptimized memchr fallback.
#[cfg(not(feature = "memchr"))]
fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
    haystack.iter().position(|&b| b == needle)
}