pub(crate) mod handle;
pub(crate) mod streaming;
use self::handle::BodyHandle;
use crate::{
convert::{Borrowable, ToHeaderName, ToHeaderValue},
experimental::BodyExt,
Error,
};
use http::header::HeaderMap;
use std::fmt::Debug;
use std::io::{BufRead, BufReader, BufWriter, Read, Write};
use std::mem::ManuallyDrop;
pub use streaming::StreamingBody;
use super::{INITIAL_HEADER_NAME_BUF_SIZE, INITIAL_HEADER_VALUE_BUF_SIZE};
pub struct Body {
reader: BufReader<BodyHandleWrapper>,
writer: BufWriter<BodyHandle>,
}
impl Debug for Body {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<opaque Body>")
}
}
impl Body {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
BodyHandle::new().into()
}
pub(super) fn get_handle(&self) -> &BodyHandle {
self.writer.get_ref()
}
fn handle(&mut self) -> &mut BodyHandle {
self.writer.get_mut()
}
pub fn into_handle(mut self) -> BodyHandle {
self.put_back_read_buf();
self.writer
.into_inner()
.expect("fastly_http_body::write failed")
}
fn put_back_read_buf(&mut self) {
let read_buf = self.reader.buffer();
if !read_buf.is_empty() {
let mut body_handle =
ManuallyDrop::new(unsafe { BodyHandle::from_u32(self.writer.get_ref().as_u32()) });
let nwritten = body_handle.write_front(read_buf);
self.reader.consume(nwritten)
};
}
#[doc = include_str!("../../docs/snippets/buffers-body.md")]
pub fn into_bytes(self) -> Vec<u8> {
self.into_handle().into_bytes()
}
#[doc = include_str!("../../docs/snippets/buffers-body.md")]
#[doc = include_str!("../../docs/snippets/panics-body-utf8.md")]
pub fn into_string(self) -> String {
self.into_handle().into_string()
}
#[doc = include_str!("../../docs/snippets/body-append-constant-time.md")]
pub fn append(&mut self, other: Body) {
self.writer.flush().expect("fastly_http_body::write failed");
self.handle().append(other.into_handle())
}
pub fn read_chunks(
&mut self,
chunk_size: usize,
) -> impl Iterator<Item = Result<Vec<u8>, std::io::Error>> + '_ {
std::iter::from_fn(move || {
let mut chunk = vec![0; chunk_size];
match self.read(&mut chunk) {
Ok(0) => None,
Ok(nread) => {
chunk.truncate(nread);
Some(Ok(chunk))
}
Err(e) => Some(Err(e)),
}
})
}
pub fn get_prefix_mut(&mut self, length: usize) -> Prefix<'_> {
self.try_get_prefix_mut(length).expect("body read failed")
}
pub fn try_get_prefix_mut(&mut self, length: usize) -> std::io::Result<Prefix<'_>> {
let mut buf = vec![];
let nread = self
.take(length.try_into().unwrap())
.read_to_end(&mut buf)?;
buf.truncate(nread);
Ok(Prefix::new(buf, self))
}
pub fn get_prefix_str_mut(&mut self, length: usize) -> PrefixString<'_> {
self.try_get_prefix_str_mut(length)
.expect("UTF-8 error in body prefix")
}
pub fn try_get_prefix_str_mut(
&mut self,
length: usize,
) -> Result<PrefixString<'_>, std::str::Utf8Error> {
let mut buf = vec![];
let nread = self
.take(length.try_into().unwrap())
.read_to_end(&mut buf)
.expect("body read failed");
buf.truncate(nread);
match String::from_utf8(buf) {
Ok(string) => Ok(PrefixString::new(string, self)),
Err(e) => {
let err = e.utf8_error();
let mut bytes = e.into_bytes();
let (excess_bytes, result) = match err.error_len() {
None => {
let end_bytes = bytes.split_off(err.valid_up_to());
let string = String::from_utf8(bytes)
.expect("expected only valid UTF-8 after splitting off bad codepoint");
(end_bytes, Ok(string))
}
Some(_) => {
(bytes, Err(err))
}
};
if !excess_bytes.is_empty() {
self.put_back_read_buf();
self.writer.get_mut().write_front(&excess_bytes);
}
result.map(move |string| PrefixString::new(string, self))
}
}
}
pub fn known_length(&mut self) -> Option<u64> {
self.handle().known_length()
}
}
impl Read for Body {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.writer.flush()?;
self.reader.read(buf)
}
fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
self.writer.flush()?;
self.reader.read_vectored(bufs)
}
}
impl BufRead for Body {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.writer.flush()?;
self.reader.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.reader.consume(amt)
}
}
impl Write for Body {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
self.writer.write_vectored(bufs)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
}
impl From<BodyHandle> for Body {
fn from(handle: BodyHandle) -> Self {
let handle2 = unsafe { BodyHandle::from_u32(handle.as_u32()) };
Self {
reader: BufReader::new(BodyHandleWrapper::new(handle)),
writer: BufWriter::new(handle2),
}
}
}
impl From<&str> for Body {
fn from(s: &str) -> Self {
BodyHandle::from(s).into()
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
BodyHandle::from(s).into()
}
}
impl From<&[u8]> for Body {
fn from(s: &[u8]) -> Self {
BodyHandle::from(s).into()
}
}
impl From<Vec<u8>> for Body {
fn from(s: Vec<u8>) -> Self {
BodyHandle::from(s).into()
}
}
pub struct Prefix<'a> {
buf: Option<Vec<u8>>,
body: &'a mut Body,
}
impl std::fmt::Debug for Prefix<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Prefix")
.field("buf", self.buf.as_ref().unwrap())
.finish()
}
}
impl<'a> Prefix<'a> {
fn new(buf: Vec<u8>, body: &'a mut Body) -> Self {
Self {
buf: Some(buf),
body,
}
}
pub fn take(mut self) -> Vec<u8> {
self.buf.take().unwrap()
}
}
impl std::ops::Deref for Prefix<'_> {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
self.buf.as_ref().unwrap()
}
}
impl std::ops::DerefMut for Prefix<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buf.as_mut().unwrap()
}
}
impl Drop for Prefix<'_> {
fn drop(&mut self) {
if let Some(buf) = &self.buf {
self.body.put_back_read_buf();
self.body.writer.get_mut().write_front(buf);
}
}
}
pub struct PrefixString<'a> {
buf: Option<String>,
body: &'a mut Body,
}
impl std::fmt::Debug for PrefixString<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrefixString")
.field("buf", self.buf.as_ref().unwrap())
.finish()
}
}
impl<'a> PrefixString<'a> {
fn new(buf: String, body: &'a mut Body) -> Self {
Self {
buf: Some(buf),
body,
}
}
pub fn take(mut self) -> String {
self.buf.take().unwrap()
}
}
impl std::ops::Deref for PrefixString<'_> {
type Target = String;
fn deref(&self) -> &Self::Target {
self.buf.as_ref().unwrap()
}
}
impl std::ops::DerefMut for PrefixString<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buf.as_mut().unwrap()
}
}
impl Drop for PrefixString<'_> {
fn drop(&mut self) {
if let Some(buf) = &self.buf {
self.body.put_back_read_buf();
self.body.writer.get_mut().write_front(buf.as_bytes());
}
}
}
#[repr(transparent)]
struct BodyHandleWrapper {
handle: ManuallyDrop<BodyHandle>,
}
impl BodyHandleWrapper {
fn new(handle: BodyHandle) -> Self {
Self {
handle: ManuallyDrop::new(handle),
}
}
}
impl Write for BodyHandleWrapper {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.handle.write(buf)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
self.handle.write_vectored(bufs)
}
fn flush(&mut self) -> std::io::Result<()> {
self.handle.flush()
}
}
impl Read for BodyHandleWrapper {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.handle.read(buf)
}
}
impl BodyExt for Body {
fn append_trailer(&mut self, name: impl ToHeaderName, value: impl ToHeaderValue) {
self.handle().append_trailer(
name.into_borrowable().as_ref(),
value.into_borrowable().as_ref(),
);
}
fn get_trailers(&mut self) -> Result<HeaderMap, Error> {
let mut trailers = HeaderMap::new();
let handle = self.handle();
for name in handle.get_trailer_names_impl(INITIAL_HEADER_NAME_BUF_SIZE, None)? {
let name = name?;
for value in
handle.get_trailer_values_impl(&name, INITIAL_HEADER_VALUE_BUF_SIZE, None)?
{
let value = value?;
trailers.append(&name, value);
}
}
Ok(trailers)
}
}