use crate::bw::BandwidthMonitor;
use crate::uninit::UninitBuf;
use crate::AsyncRead;
use bytes::Bytes;
use futures_io::AsyncBufRead;
use futures_util::future::poll_fn;
use futures_util::ready;
use h2::RecvStream as H2RecvStream;
use hreq_h1::RecvStream as H1RecvStream;
use std::fmt;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "gzip")]
use async_compression::futures::bufread::{GzipDecoder, GzipEncoder};
#[cfg(feature = "gzip")]
use futures_util::io::BufReader;
const START_BUF_SIZE: usize = 16_384;
const MAX_BUF_SIZE: usize = 2 * 1024 * 1024;
const MAX_PREBUFFER: usize = 256 * 1024;
#[allow(clippy::large_enum_variant)]
pub(crate) enum BodyCodec {
Deferred(Option<BodyReader>),
Pass(BodyReader),
#[cfg(feature = "gzip")]
GzipDecoder(BufReader<GzipDecoder<BodyReader>>),
#[cfg(feature = "gzip")]
GzipEncoder(BufReader<GzipEncoder<BodyReader>>),
}
impl BodyCodec {
pub fn deferred(bimp: BodyImpl, prebuffer: bool) -> Self {
let reader = BodyReader::new(bimp, prebuffer);
BodyCodec::Deferred(Some(reader))
}
#[cfg(feature = "server")]
pub fn into_deferred(self) -> Self {
let reader = self.into_inner();
BodyCodec::Deferred(Some(reader))
}
#[cfg(feature = "server")]
fn into_inner(self) -> BodyReader {
match self {
BodyCodec::Deferred(_) => panic!("into_inner() on Deferred"),
BodyCodec::Pass(b) => b,
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(z) => z.into_inner().into_inner(),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(z) => z.into_inner().into_inner(),
}
}
pub fn from_encoding(reader: BodyReader, encoding: Option<&str>, is_incoming: bool) -> Self {
trace!("Body codec from encoding: {:?}", encoding);
match (encoding, is_incoming) {
(None, _) => BodyCodec::Pass(reader),
#[cfg(feature = "gzip")]
(Some("gzip"), true) => {
BodyCodec::GzipDecoder(BufReader::new(GzipDecoder::new(reader)))
}
#[cfg(feature = "gzip")]
(Some("gzip"), false) => {
BodyCodec::GzipEncoder(BufReader::new(GzipEncoder::new(reader)))
}
_ => {
warn!("Unknown content-encoding: {:?}", encoding);
BodyCodec::Pass(reader)
}
}
}
fn reader_mut(&mut self) -> Option<&mut BodyReader> {
match self {
BodyCodec::Deferred(r) => r.as_mut(),
BodyCodec::Pass(r) => Some(r),
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(r) => Some(r.get_mut().get_mut()),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(r) => Some(r.get_mut().get_mut()),
}
}
pub fn affects_content_size(&self) -> bool {
match self {
BodyCodec::Deferred(_) => false,
BodyCodec::Pass(_) => false,
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(_) => true,
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(_) => true,
}
}
pub async fn attempt_prebuffer(&mut self) -> io::Result<Option<usize>> {
if let Some(rdr) = self.reader_mut() {
if let Some(amt) = rdr.attempt_prebuffer().await? {
return Ok(Some(amt));
}
}
Ok(None)
}
}
pub struct BodyReader {
imp: BodyImpl,
prebuffer_to: usize,
buffer: UninitBuf,
consumed: usize,
h2_leftover_bytes: Option<H2BytesReader>,
is_finished: bool,
bw: Option<BandwidthMonitor>,
}
pub(crate) enum BodyImpl {
RequestEmpty,
RequestAsyncRead(Box<dyn AsyncRead + Unpin + Send + Sync>),
RequestRead(Box<dyn io::Read + Send + Sync>),
Http1(H1RecvStream),
Http2(H2RecvStream),
}
impl BodyReader {
fn new(imp: BodyImpl, prebuffer: bool) -> Self {
BodyReader {
imp,
prebuffer_to: if prebuffer { MAX_PREBUFFER } else { 0 },
h2_leftover_bytes: None,
buffer: UninitBuf::with_capacity(START_BUF_SIZE, MAX_BUF_SIZE),
consumed: 0,
is_finished: false,
bw: None,
}
}
pub(crate) fn set_bw_monitor(&mut self, bw: Option<BandwidthMonitor>) {
self.bw = bw;
}
async fn attempt_prebuffer(&mut self) -> io::Result<Option<usize>> {
poll_fn(|cx| Pin::new(&mut *self).poll_refill_buf(cx)).await?;
Ok(if self.is_finished {
Some(self.buffer.len())
} else {
None
})
.into()
}
fn poll_read_underlying(
&mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if self.is_finished {
return Ok(0).into();
}
if let Some(br) = &mut self.h2_leftover_bytes {
let amt = br.read(buf)?;
if br.len() == 0 {
self.h2_leftover_bytes = None;
}
return Ok(amt).into();
}
let amount = match &mut self.imp {
BodyImpl::RequestEmpty => 0,
BodyImpl::RequestAsyncRead(reader) => ready!(Pin::new(reader).poll_read(cx, buf))?,
BodyImpl::RequestRead(reader) => match reader.read(buf) {
Ok(v) => v,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
panic!("Body::from_sync_read() failed with ErrorKind::WouldBlock. Use Body::from_async_read()");
}
return Err(e).into();
}
},
BodyImpl::Http1(recv) => ready!(Pin::new(recv).poll_read(cx, buf))?,
BodyImpl::Http2(recv) => {
if let Some(data) = ready!(recv.poll_data(cx)) {
let data = data.map_err(|e| {
let other = format!("Other h2 error (poll_data): {}", e);
e.into_io()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, other))
})?;
recv.flow_control()
.release_capacity(data.len())
.map_err(|e| {
let other = format!("Other h2 error (release_capacity): {}", e);
e.into_io()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, other))
})?;
let mut br = H2BytesReader(data, 0);
let amt = br.read(buf)?;
if br.len() > 0 {
self.h2_leftover_bytes = Some(br);
}
amt
} else {
0
}
}
};
if amount == 0 {
self.is_finished = true;
}
if let Some(bw) = &self.bw {
bw.append_read_bytes(amount);
}
Ok(amount).into()
}
fn poll_refill_buf(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.unconsumed_len() > 0 {
return Ok(()).into();
}
self.consumed = 0;
self.buffer.clear();
loop {
let buffer_len = self.buffer.len();
let read_enough =
self.prebuffer_to > 0 && buffer_len == self.prebuffer_to
|| self.prebuffer_to == 0 && self.buffer.len() > 0;
if self.is_finished || read_enough {
self.prebuffer_to = 0;
return Ok(()).into();
}
let ptr = &mut self.buffer as *mut UninitBuf;
let buf = unsafe { &mut *ptr };
ready!(buf.poll_delegate(|buf| self.poll_read_underlying(cx, buf)))?;
}
}
fn unconsumed(&self) -> &[u8] {
&self.buffer[self.consumed..]
}
fn unconsumed_len(&self) -> usize {
self.buffer.len() - self.consumed
}
}
impl AsyncBufRead for BodyReader {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<&[u8]>> {
let this = self.get_mut();
ready!(this.poll_refill_buf(cx))?;
return Ok(this.unconsumed()).into();
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.consumed += amt;
assert!(this.consumed <= this.buffer.len());
}
}
impl AsyncRead for BodyReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
if this.unconsumed_len() == 0 {
if this.is_finished {
return Ok(0).into();
} else {
ready!(this.poll_refill_buf(cx))?;
}
}
let amount = this.unconsumed().read(buf)?;
Pin::new(this).consume(amount);
Ok(amount).into()
}
}
impl AsyncRead for BodyCodec {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
match this {
BodyCodec::Deferred(_) => panic!("poll_read on BodyCodec::Deferred"),
BodyCodec::Pass(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(r) => Pin::new(r).poll_read(cx, buf),
}
}
}
impl AsyncBufRead for BodyCodec {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<&[u8]>> {
match self.get_mut() {
BodyCodec::Deferred(_) => panic!("poll_fill_buf on Deferred"),
BodyCodec::Pass(r) => Pin::new(r).poll_fill_buf(cx),
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(r) => Pin::new(r).poll_fill_buf(cx),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(r) => Pin::new(r).poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
match self.get_mut() {
BodyCodec::Deferred(_) => panic!("consume on Deferred"),
BodyCodec::Pass(r) => Pin::new(r).consume(amt),
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(r) => Pin::new(r).consume(amt),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(r) => Pin::new(r).consume(amt),
}
}
}
impl fmt::Debug for BodyCodec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BodyCodec::Deferred(_) => write!(f, "defer"),
BodyCodec::Pass(_) => write!(f, "pass"),
#[cfg(feature = "gzip")]
BodyCodec::GzipDecoder(_) => write!(f, "gzip_dec"),
#[cfg(feature = "gzip")]
BodyCodec::GzipEncoder(_) => write!(f, "gzip_enc"),
}
}
}
impl fmt::Debug for BodyReader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.imp)
}
}
impl fmt::Debug for BodyImpl {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BodyImpl::RequestEmpty => write!(f, "empty"),
BodyImpl::RequestAsyncRead(_) => write!(f, "async"),
BodyImpl::RequestRead(_) => write!(f, "sync"),
BodyImpl::Http1(_) => write!(f, "http1"),
BodyImpl::Http2(_) => write!(f, "http2"),
}
}
}
struct H2BytesReader(Bytes, usize);
impl H2BytesReader {
fn len(&self) -> usize {
self.0.len() - self.1
}
}
impl Read for H2BytesReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.len() == 0 {
return Ok(0);
}
let amt = (&self.0[self.1..]).read(buf)?;
self.1 += amt;
Ok(amt)
}
}