use std::fmt;
use std::fs::File;
use std::future::Future;
#[cfg(feature = "multipart")]
use std::io::Cursor;
use std::io::{self, Read};
use std::mem::{self, MaybeUninit};
use std::ptr;
use bytes::Bytes;
use futures_channel::mpsc;
use crate::async_impl;
#[derive(Debug)]
pub struct Body {
kind: Kind,
}
impl Body {
pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
Body {
kind: Kind::Reader(Box::from(reader), None),
}
}
pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
Body {
kind: Kind::Reader(Box::from(reader), Some(len)),
}
}
pub fn as_bytes(&self) -> Option<&[u8]> {
match self.kind {
Kind::Reader(_, _) => None,
Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
}
}
pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
match self.kind {
Kind::Reader(ref mut reader, maybe_len) => {
let mut bytes = if let Some(len) = maybe_len {
Vec::with_capacity(len as usize)
} else {
Vec::new()
};
io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
self.kind = Kind::Bytes(bytes.into());
self.buffer()
}
Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
}
}
#[cfg(feature = "multipart")]
pub(crate) fn len(&self) -> Option<u64> {
match self.kind {
Kind::Reader(_, len) => len,
Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
}
}
#[cfg(feature = "multipart")]
pub(crate) fn into_reader(self) -> Reader {
match self.kind {
Kind::Reader(r, _) => Reader::Reader(r),
Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
}
}
pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
match self.kind {
Kind::Reader(read, len) => {
let (tx, rx) = mpsc::channel(0);
let tx = Sender {
body: (read, len),
tx,
};
(Some(tx), async_impl::Body::stream(rx), len)
}
Kind::Bytes(chunk) => {
let len = chunk.len() as u64;
(None, async_impl::Body::reusable(chunk), Some(len))
}
}
}
pub(crate) fn try_clone(&self) -> Option<Body> {
self.kind.try_clone().map(|kind| Body { kind })
}
}
enum Kind {
Reader(Box<dyn Read + Send>, Option<u64>),
Bytes(Bytes),
}
impl Kind {
fn try_clone(&self) -> Option<Kind> {
match self {
Kind::Reader(..) => None,
Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
}
}
}
impl From<Vec<u8>> for Body {
#[inline]
fn from(v: Vec<u8>) -> Body {
Body {
kind: Kind::Bytes(v.into()),
}
}
}
impl From<String> for Body {
#[inline]
fn from(s: String) -> Body {
s.into_bytes().into()
}
}
impl From<&'static [u8]> for Body {
#[inline]
fn from(s: &'static [u8]) -> Body {
Body {
kind: Kind::Bytes(Bytes::from_static(s)),
}
}
}
impl From<&'static str> for Body {
#[inline]
fn from(s: &'static str) -> Body {
s.as_bytes().into()
}
}
impl From<File> for Body {
#[inline]
fn from(f: File) -> Body {
let len = f.metadata().map(|m| m.len()).ok();
Body {
kind: Kind::Reader(Box::new(f), len),
}
}
}
impl From<Bytes> for Body {
#[inline]
fn from(b: Bytes) -> Body {
Body {
kind: Kind::Bytes(b),
}
}
}
impl fmt::Debug for Kind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Kind::Reader(_, ref v) => f
.debug_struct("Reader")
.field("length", &DebugLength(v))
.finish(),
Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
}
}
}
struct DebugLength<'a>(&'a Option<u64>);
impl<'a> fmt::Debug for DebugLength<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self.0 {
Some(ref len) => fmt::Debug::fmt(len, f),
None => f.write_str("Unknown"),
}
}
}
#[cfg(feature = "multipart")]
pub(crate) enum Reader {
Reader(Box<dyn Read + Send>),
Bytes(Cursor<Bytes>),
}
#[cfg(feature = "multipart")]
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
Reader::Reader(ref mut rdr) => rdr.read(buf),
Reader::Bytes(ref mut rdr) => rdr.read(buf),
}
}
}
pub(crate) struct Sender {
body: (Box<dyn Read + Send>, Option<u64>),
tx: mpsc::Sender<Result<Bytes, Abort>>,
}
#[derive(Debug)]
struct Abort;
impl fmt::Display for Abort {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("abort request body")
}
}
impl std::error::Error for Abort {}
async fn send_future(sender: Sender) -> Result<(), crate::Error> {
use bytes::{BufMut, BytesMut};
use futures_util::SinkExt;
use std::cmp;
let con_len = sender.body.1;
let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
let mut written = 0;
let mut buf = BytesMut::zeroed(cap as usize);
buf.clear();
let mut body = sender.body.0;
let mut tx = Some(sender.tx);
loop {
if Some(written) == con_len {
return Ok(());
}
if buf.is_empty() {
if buf.capacity() == buf.len() {
buf.reserve(8192);
let uninit = buf.spare_capacity_mut();
let uninit_len = uninit.len();
unsafe {
ptr::write_bytes(uninit.as_mut_ptr().cast::<u8>(), 0, uninit_len);
}
}
let bytes = unsafe {
mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.spare_capacity_mut())
};
match body.read(bytes) {
Ok(0) => {
return Ok(());
}
Ok(n) => unsafe {
buf.advance_mut(n);
},
Err(e) => {
let _ = tx
.take()
.expect("tx only taken on error")
.clone()
.try_send(Err(Abort));
return Err(crate::error::body(e));
}
}
}
let buf_len = buf.len() as u64;
tx.as_mut()
.expect("tx only taken on error")
.send(Ok(buf.split().freeze()))
.await
.map_err(crate::error::body)?;
written += buf_len;
}
}
impl Sender {
pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
send_future(self)
}
}
#[cfg(test)]
pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
let mut s = String::new();
match body.kind {
Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
}
.map(|_| s)
}