use super::super::super::std::{error::*, immutable::*};
use {
http::*,
http_body::*,
std::{cmp::*, io, pin::*, task::*},
tokio::io::*,
};
const REMAINDER_INITIAL_CAPACITY: usize = 8 * 1_024;
pub struct BodyReader<BodyT> {
body: Pin<Box<BodyT>>,
pub remainder: BytesMut,
pub trailers: Vec<HeaderMap>,
}
impl<BodyT> BodyReader<BodyT> {
pub fn new(body: BodyT) -> Self {
Self::new_with_first_bytes(body, None)
}
pub fn new_with_first_bytes(body: BodyT, first_bytes: Option<ImmutableBytes>) -> Self {
let remainder = match first_bytes {
Some(first_bytes) => first_bytes.into(),
None => BytesMut::with_capacity(0),
};
Self { body: Box::pin(body), remainder, trailers: Default::default() }
}
pub fn into_inner(self) -> (BodyT, BytesMut, Vec<HeaderMap>)
where
BodyT: Unpin,
{
(*Pin::into_inner(self.body), self.remainder, self.trailers)
}
fn validate_remainder_capacity(&mut self) {
let capacity = self.remainder.capacity();
if capacity < REMAINDER_INITIAL_CAPACITY {
self.remainder.reserve(REMAINDER_INITIAL_CAPACITY - capacity);
}
}
}
impl<BodyT> AsyncRead for BodyReader<BodyT>
where
BodyT: Body,
BodyT::Error: Into<CapturedError>, {
fn poll_read(mut self: Pin<&mut Self>, context: &mut Context, buffer: &mut ReadBuf) -> Poll<io::Result<()>> {
if self.remainder.has_remaining() {
let size = min(buffer.remaining_mut(), self.remainder.remaining());
if size != 0 {
let bytes = self.remainder.copy_to_bytes(size);
buffer.put(bytes);
if !buffer.has_remaining_mut() {
return Poll::Ready(Ok(()));
}
}
}
Poll::Ready(match ready!(self.body.as_mut().poll_frame(context)) {
Some(result) => {
let frame = result.map_err(io::Error::other)?;
match frame.into_data() {
Ok(mut data) => {
let size = min(buffer.remaining_mut(), data.remaining());
if size != 0 {
let bytes = data.copy_to_bytes(size);
buffer.put(bytes);
}
if data.has_remaining() {
self.validate_remainder_capacity();
self.remainder.put(data);
}
Ok(())
}
Err(frame) => {
match frame.into_trailers() {
Ok(trailers) => {
tracing::debug!("trailers frame");
self.trailers.push(trailers);
}
Err(_frame) => {
tracing::warn!("frame is not data and not trailers");
}
}
Ok(())
}
}
}
None => Ok(()),
})
}
}
pub trait IntoBodyReader<BodyT>
where
Self: Sized,
{
fn into_reader(self) -> BodyReader<BodyT> {
self.into_reader_with_first_bytes(None)
}
fn into_reader_with_first_bytes(self, first_bytes: Option<ImmutableBytes>) -> BodyReader<BodyT>;
}
impl<BodyT> IntoBodyReader<BodyT> for BodyT
where
BodyT: Body,
BodyT::Error: Into<CapturedError>,
{
fn into_reader_with_first_bytes(self, first_bytes: Option<ImmutableBytes>) -> BodyReader<BodyT> {
BodyReader::new_with_first_bytes(self, first_bytes)
}
}