http-body 0.4.6

Trait representing an asynchronous, streaming, HTTP request or response body.
Documentation
use std::{
    collections::VecDeque,
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use super::Body;

use bytes::{Buf, Bytes};
use http::HeaderMap;
use pin_project_lite::pin_project;

pin_project! {
    /// Future that resolves into a [`Collected`].
    pub struct Collect<T>
    where
        T: Body,
    {
        #[pin]
        body: T,
        collected: Option<Collected<T::Data>>,
        is_data_done: bool,
    }
}

impl<T: Body> Collect<T> {
    pub(crate) fn new(body: T) -> Self {
        Self {
            body,
            collected: Some(Collected::default()),
            is_data_done: false,
        }
    }
}

impl<T: Body> Future for Collect<T> {
    type Output = Result<Collected<T::Data>, T::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut me = self.project();

        loop {
            if !*me.is_data_done {
                match me.body.as_mut().poll_data(cx) {
                    Poll::Ready(Some(Ok(data))) => {
                        me.collected.as_mut().unwrap().push_data(data);
                    }
                    Poll::Ready(Some(Err(err))) => {
                        return Poll::Ready(Err(err));
                    }
                    Poll::Ready(None) => {
                        *me.is_data_done = true;
                    }
                    Poll::Pending => return Poll::Pending,
                }
            } else {
                match me.body.as_mut().poll_trailers(cx) {
                    Poll::Ready(Ok(Some(trailers))) => {
                        me.collected.as_mut().unwrap().push_trailers(trailers);
                        break;
                    }
                    Poll::Ready(Err(err)) => {
                        return Poll::Ready(Err(err));
                    }
                    Poll::Ready(Ok(None)) => break,
                    Poll::Pending => return Poll::Pending,
                }
            }
        }

        Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
    }
}

/// A collected body produced by [`Body::collect`] which collects all the DATA frames
/// and trailers.
#[derive(Debug)]
pub struct Collected<B> {
    bufs: BufList<B>,
    trailers: Option<HeaderMap>,
}

impl<B: Buf> Collected<B> {
    /// If there is a trailers frame buffered, returns a reference to it.
    ///
    /// Returns `None` if the body contained no trailers.
    pub fn trailers(&self) -> Option<&HeaderMap> {
        self.trailers.as_ref()
    }

    /// Aggregate this buffered into a [`Buf`].
    pub fn aggregate(self) -> impl Buf {
        self.bufs
    }

    /// Convert this body into a [`Bytes`].
    pub fn to_bytes(mut self) -> Bytes {
        self.bufs.copy_to_bytes(self.bufs.remaining())
    }

    fn push_data(&mut self, data: B) {
        // Only push this frame if it has some data in it, to avoid crashing on
        // `BufList::push`.
        if data.has_remaining() {
            self.bufs.push(data);
        }
    }

    fn push_trailers(&mut self, trailers: HeaderMap) {
        if let Some(current) = &mut self.trailers {
            current.extend(trailers);
        } else {
            self.trailers = Some(trailers);
        }
    }
}

impl<B> Default for Collected<B> {
    fn default() -> Self {
        Self {
            bufs: BufList::default(),
            trailers: None,
        }
    }
}

impl<B> Unpin for Collected<B> {}

#[derive(Debug)]
struct BufList<T> {
    bufs: VecDeque<T>,
}

impl<T: Buf> BufList<T> {
    #[inline]
    pub(crate) fn push(&mut self, buf: T) {
        debug_assert!(buf.has_remaining());
        self.bufs.push_back(buf);
    }

    /*
    #[inline]
    pub(crate) fn pop(&mut self) -> Option<T> {
        self.bufs.pop_front()
    }
    */
}

impl<T: Buf> Buf for BufList<T> {
    #[inline]
    fn remaining(&self) -> usize {
        self.bufs.iter().map(|buf| buf.remaining()).sum()
    }

    #[inline]
    fn chunk(&self) -> &[u8] {
        self.bufs.front().map(Buf::chunk).unwrap_or_default()
    }

    #[inline]
    fn advance(&mut self, mut cnt: usize) {
        while cnt > 0 {
            {
                let front = &mut self.bufs[0];
                let rem = front.remaining();
                if rem > cnt {
                    front.advance(cnt);
                    return;
                } else {
                    front.advance(rem);
                    cnt -= rem;
                }
            }
            self.bufs.pop_front();
        }
    }

    #[inline]
    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
        if dst.is_empty() {
            return 0;
        }
        let mut vecs = 0;
        for buf in &self.bufs {
            vecs += buf.chunks_vectored(&mut dst[vecs..]);
            if vecs == dst.len() {
                break;
            }
        }
        vecs
    }

    #[inline]
    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
        use bytes::{BufMut, BytesMut};
        // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
        // request can be fulfilled by the front buffer, we can take advantage.
        match self.bufs.front_mut() {
            Some(front) if front.remaining() == len => {
                let b = front.copy_to_bytes(len);
                self.bufs.pop_front();
                b
            }
            Some(front) if front.remaining() > len => front.copy_to_bytes(len),
            _ => {
                assert!(len <= self.remaining(), "`len` greater than remaining");
                let mut bm = BytesMut::with_capacity(len);
                bm.put(self.take(len));
                bm.freeze()
            }
        }
    }
}

impl<T> Default for BufList<T> {
    fn default() -> Self {
        BufList {
            bufs: VecDeque::new(),
        }
    }
}