use crate::body_codec::{BodyCodec, BodyImpl};
use crate::bw::BandwidthMonitor;
use crate::charset::CharCodec;
use crate::head_ext::HeaderMapExt;
use crate::params::HReqParams;
use crate::uninit::UninitBuf;
use crate::AsyncRead;
use crate::AsyncRuntime;
use crate::Error;
use encoding_rs::Encoding;
use futures_util::future::poll_fn;
use futures_util::io::AsyncReadExt;
use futures_util::ready;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt;
use std::future::Future;
use std::io;
use std::io::Cursor;
use std::io::Read;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
const CT_TEXT: &str = "text/plain; charset=utf-8";
const CT_BIN: &str = "application/octet-stream";
const CT_JSON: &str = "application/json; charset=utf-8";
pub struct Body {
codec: BodyCodec,
length: Option<u64>, content_typ: Option<&'static str>,
override_source_enc: Option<&'static Encoding>,
has_read: bool,
char_codec: Option<CharCodec>,
deadline_fut: Option<Pin<Box<dyn Future<Output = io::Error> + Send + Sync>>>,
unfinished_recs: Option<Arc<()>>,
prebuffered: Option<Cursor<Vec<u8>>>,
bw: Option<BandwidthMonitor>,
}
impl Body {
pub fn empty() -> Self {
Self::new(BodyImpl::RequestEmpty, Some(0), true).ctype(CT_TEXT)
}
#[allow(clippy::should_implement_trait)]
pub fn from_str(text: &str) -> Self {
Self::from_string(text.to_owned()).ctype(CT_TEXT)
}
pub fn from_string(text: String) -> Self {
let mut new = Self::from_vec(text.into_bytes()).ctype(CT_TEXT);
new.override_source_enc = Some(encoding_rs::UTF_8);
new
}
pub fn from_bytes(bytes: &[u8]) -> Self {
Self::from_vec(bytes.to_owned()).ctype(CT_BIN)
}
pub fn from_vec(bytes: Vec<u8>) -> Self {
let len = bytes.len() as u64;
Self::from_sync_read(io::Cursor::new(bytes), Some(len)).ctype(CT_BIN)
}
pub fn from_file(file: std::fs::File) -> Self {
let length = file.metadata().ok().map(|m| m.len());
let reader = AsyncRuntime::file_to_reader(file);
Body::from_async_read(reader, length).ctype(CT_BIN)
}
pub fn from_json<B: Serialize + ?Sized>(json: &B) -> Self {
let vec = serde_json::to_vec(json).expect("Failed to encode JSON");
Self::from_vec(vec).ctype(CT_JSON)
}
pub fn from_async_read<R>(reader: R, length: Option<u64>) -> Self
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
let boxed = Box::new(reader);
Self::new(BodyImpl::RequestAsyncRead(boxed), length, true).ctype(CT_BIN)
}
pub fn from_sync_read<R>(reader: R, length: Option<u64>) -> Self
where
R: io::Read + Send + Sync + 'static,
{
let boxed = Box::new(reader);
Self::new(BodyImpl::RequestRead(boxed), length, true).ctype(CT_BIN)
}
pub(crate) fn new(bimpl: BodyImpl, length: Option<u64>, prebuffer: bool) -> Self {
let codec = BodyCodec::deferred(bimpl, prebuffer);
Body {
codec,
length,
content_typ: None,
override_source_enc: None,
has_read: false,
char_codec: None,
deadline_fut: None,
unfinished_recs: None,
prebuffered: None,
bw: None,
}
}
fn ctype(mut self, c: &'static str) -> Self {
self.content_typ = Some(c);
self
}
pub(crate) fn set_unfinished_recs(&mut self, unfin: Arc<()>) {
self.unfinished_recs = Some(unfin);
}
pub(crate) fn set_bw_monitor(&mut self, bw: Option<BandwidthMonitor>) {
self.bw = bw;
}
pub(crate) fn is_definitely_no_body(&self) -> bool {
self.length.map(|l| l == 0).unwrap_or(false)
}
pub(crate) fn is_definitely_a_body(&self) -> bool {
self.length.map(|l| l > 0).unwrap_or(true)
}
pub(crate) fn content_encoded_length(&self) -> Option<u64> {
if self.prebuffered.is_some() {
self.length
} else if self.codec.affects_content_size() || self.char_codec.is_some() {
None
} else {
self.length
}
}
pub(crate) fn content_type(&self) -> Option<&str> {
self.content_typ
}
pub(crate) fn is_configurable(&self) -> bool {
!self.has_read
}
#[cfg(feature = "server")]
pub(crate) fn unconfigure(self) -> Self {
Body {
codec: self.codec.into_deferred(),
char_codec: None,
..self
}
}
#[allow(clippy::collapsible_if)]
pub(crate) fn configure(
&mut self,
params: &HReqParams,
headers: &http::header::HeaderMap,
is_incoming: bool,
) {
if self.has_read {
panic!("configure after body started reading");
}
self.deadline_fut = Some(params.deadline().delay_fut());
let mut new_codec = None;
if let BodyCodec::Deferred(reader) = &mut self.codec {
if let Some(mut reader) = reader.take() {
reader.set_bw_monitor(self.bw.clone());
let use_enc =
!is_incoming && params.content_encode || is_incoming && params.content_decode;
new_codec = if use_enc {
let encoding = headers.get_str("content-encoding");
Some(BodyCodec::from_encoding(reader, encoding, is_incoming))
} else {
Some(BodyCodec::Pass(reader))
};
}
}
if let Some(new_codec) = new_codec {
self.codec = new_codec;
}
let charset_config = if is_incoming {
¶ms.charset_rx
} else {
¶ms.charset_tx
};
if let Some((from, to)) =
charset_config.resolve(is_incoming, headers, self.override_source_enc)
{
if from == to {
trace!("Charset codec pass through: {:?}", from);
} else {
self.char_codec = Some(CharCodec::new(from, to));
trace!(
"Charset codec ({}): {:?}",
if is_incoming { "incoming" } else { "outgoing" },
self.char_codec
);
}
}
}
pub(crate) async fn attempt_prebuffer(&mut self) -> Result<(), Error> {
if let Some(amt) = self.codec.attempt_prebuffer().await? {
let mut buffer_into = Vec::with_capacity(amt);
self.read_to_end(&mut buffer_into).await?;
trace!("Fully prebuffered: {}", buffer_into.len());
self.length = Some(buffer_into.len() as u64);
self.prebuffered = Some(Cursor::new(buffer_into));
}
Ok(())
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn set_codec_pass(&mut self) {
if let BodyCodec::Deferred(reader) = &mut self.codec {
if let Some(reader) = reader.take() {
let new_codec = BodyCodec::Pass(reader);
self.codec = new_codec;
}
}
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
Ok(poll_fn(|cx| Pin::new(&mut *self).poll_read(cx, buf)).await?)
}
pub async fn read_to_vec(&mut self) -> Result<Vec<u8>, Error> {
let mut vec = Vec::with_capacity(8192);
self.read_to_end(&mut vec).await?;
trace!("read_to_vec returning len: {}", vec.len());
Ok(vec)
}
pub async fn read_to_string(&mut self) -> Result<String, Error> {
if let Some(char_codec) = &mut self.char_codec {
char_codec.remove_encoder();
}
let vec = self.read_to_vec().await?;
Ok(String::from_utf8(vec).expect("Incoming body is not valid utf-8"))
}
pub async fn read_to_json<T: DeserializeOwned>(&mut self) -> Result<T, Error> {
let s = self.read_to_string().await?;
Ok(serde_json::from_str(&s)?)
}
pub async fn read_and_discard(&mut self) -> Result<(), Error> {
const START_BUF_SIZE: usize = 16_384;
const MAX_BUF_SIZE: usize = 2 * 1024 * 1024;
let mut buf = UninitBuf::with_capacity(START_BUF_SIZE, MAX_BUF_SIZE);
loop {
buf.clear();
let read = buf.read_from_async(self).await?;
if read == 0 {
break;
}
}
Ok(())
}
}
impl From<()> for Body {
fn from(_: ()) -> Self {
Body::empty()
}
}
impl<'a> From<&'a str> for Body {
fn from(s: &'a str) -> Self {
Body::from_str(s)
}
}
impl<'a> From<&'a String> for Body {
fn from(s: &'a String) -> Self {
Body::from_string(s.clone())
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
Body::from_string(s)
}
}
impl<'a> From<&'a [u8]> for Body {
fn from(bytes: &'a [u8]) -> Self {
Body::from_bytes(bytes)
}
}
impl From<Vec<u8>> for Body {
fn from(bytes: Vec<u8>) -> Self {
Body::from_vec(bytes)
}
}
impl<'a> From<&'a Vec<u8>> for Body {
fn from(bytes: &'a Vec<u8>) -> Self {
Body::from_vec(bytes.clone())
}
}
impl From<std::fs::File> for Body {
fn from(file: std::fs::File) -> Self {
Body::from_file(file)
}
}
impl AsyncRead for Body {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
if !this.has_read {
this.has_read = true;
}
let deadl = this.deadline_fut.as_mut();
if let Some(deadl) = deadl {
if let Poll::Ready(err) = deadl.as_mut().poll(cx) {
return Poll::Ready(Err(err));
}
}
let amount = if let Some(prebuf) = &mut this.prebuffered {
prebuf.read(buf)?
} else {
ready!(if let Some(char_codec) = &mut this.char_codec {
char_codec.poll_codec(cx, &mut this.codec, buf)
} else {
Pin::new(&mut this.codec).poll_read(cx, buf)
})?
};
if amount == 0 {
this.unfinished_recs.take();
}
Ok(amount).into()
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Body {{ codec: {:?}", self.codec)?;
if let Some(char_codec) = &self.char_codec {
write!(f, ", char_codec: {:?}", char_codec)?;
}
write!(f, ", len: ")?;
match self.content_encoded_length() {
Some(v) => write!(f, "{}", v),
None => write!(f, "unknown"),
}?;
if self.prebuffered.is_some() {
write!(f, ", prebuffered")?;
}
write!(f, " }}")
}
}