mod convert;
mod error_type;
#[cfg(feature = "std")]
mod utils;
use crate::sse::{Event, SseStream};
pub use error_type::Error;
#[cfg(feature = "std")]
extern crate std;
use futures_lite::{ready, Stream, StreamExt};
use http_body::Frame;
use http_body_util::{BodyExt, StreamBody};
use mime::Mime;
#[cfg(feature = "std")]
use self::utils::IntoAsyncRead;
use bytestr::ByteStr;
use bytes::Bytes;
use futures_lite::{AsyncBufRead, AsyncBufReadExt};
use alloc::{boxed::Box, vec::Vec};
use core::fmt::Debug;
use core::mem::{replace, swap, take};
use core::pin::Pin;
use core::task::{Context, Poll};
type BoxBufReader = Pin<Box<dyn AsyncBufRead + Send + Sync + 'static>>;
type BoxHttpBody =
Pin<Box<dyn http_body::Body<Data = Bytes, Error = Error> + Send + Sync + 'static>>;
pub use http_body::Body as HttpBody;
pub struct Body {
mime: Option<Mime>,
inner: BodyInner,
}
impl Debug for Body {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_str("Body")
}
}
impl_error!(
BodyFrozen,
"Body was frozen,it may have been consumed by `take()`"
);
enum BodyInner {
Once(Bytes),
Reader {
reader: BoxBufReader,
length: Option<usize>,
},
HttpBody(BoxHttpBody),
Freeze,
}
impl Default for BodyInner {
fn default() -> Self {
Self::Once(Bytes::new())
}
}
impl Body {
pub const fn empty() -> Self {
Self {
mime: None,
inner: BodyInner::Once(Bytes::new()),
}
}
pub fn new<B>(body: B) -> Self
where
B: Send + Sync + http_body::Body + 'static,
B::Data: Into<Bytes>,
B::Error: Into<Error>,
{
Self {
mime: None,
inner: BodyInner::HttpBody(Box::pin(
body.map_frame(|result| result.map_data(|data| data.into()))
.map_err(|e| e.into()),
)),
}
}
pub const fn frozen() -> Self {
Self {
mime: None,
inner: BodyInner::Freeze,
}
}
pub fn from_reader(
reader: impl AsyncBufRead + Send + Sync + 'static,
length: impl Into<Option<usize>>,
) -> Self {
Self {
mime: None,
inner: BodyInner::Reader {
reader: Box::pin(reader),
length: length.into(),
},
}
}
pub fn from_stream<T, E, S>(stream: S) -> Self
where
T: Into<Bytes> + Send + 'static,
E: Into<Error>,
S: Stream<Item = Result<T, E>> + Send + Sync + 'static,
{
Self {
mime: None,
inner: BodyInner::HttpBody(Box::pin(StreamBody::new(stream.map(|result| {
result
.map(|data| Frame::data(data.into()))
.map_err(|error| error.into())
})))),
}
}
pub fn from_bytes(data: impl Into<Bytes>) -> Self {
Self {
mime: Some(mime::APPLICATION_OCTET_STREAM),
inner: BodyInner::Once(data.into()),
}
}
pub fn from_text(str: impl Into<ByteStr>) -> Self {
Self {
mime: Some(mime::TEXT_PLAIN_UTF_8),
inner: BodyInner::Once(str.into().into()),
}
}
#[cfg(all(feature = "fs", feature = "std"))]
pub async fn from_file(path: impl AsRef<std::path::Path>) -> Result<Self, std::io::Error> {
let path = path.as_ref();
let file = async_fs::File::open(path).await?;
let len = file.metadata().await?.len() as usize;
let mime = if let Some(ext) = path.extension() {
if let Some(ext_str) = ext.to_str() {
Self::guess(ext_str.as_bytes()).and_then(|m| m.parse().ok())
} else {
None
}
} else {
None
};
Ok(Self {
mime,
..Self::from_reader(futures_lite::io::BufReader::new(file), len)
})
}
#[cfg(feature = "json")]
pub fn from_json<T: serde::Serialize>(value: T) -> Result<Self, serde_json::Error> {
Ok(Self {
mime: Some(mime::APPLICATION_JSON),
..Self::from_bytes(serde_json::to_string(&value)?)
})
}
#[cfg(feature = "fs")]
fn guess(extension: &[u8]) -> Option<&'static str> {
let s = core::str::from_utf8(extension).ok()?;
mime_guess::from_ext(s).first_raw()
}
#[cfg(feature = "form")]
pub fn from_form<T: serde::Serialize>(value: T) -> Result<Self, serde_urlencoded::ser::Error> {
Ok(Self {
mime: Some(mime::APPLICATION_WWW_FORM_URLENCODED),
..Self::from_bytes(serde_urlencoded::to_string(value)?)
})
}
pub fn from_sse<S, E>(s: S) -> Self
where
S: Stream<Item = Result<Event, E>> + Send + Sync + 'static,
E: Into<Error> + Send + Sync + 'static,
{
Self {
mime: Some(mime::TEXT_EVENT_STREAM),
inner: BodyInner::HttpBody(Box::pin(
crate::sse::into_body(s)
.map_frame(|result| result.map_data(|data| data))
.map_err(|e| e.into()),
)),
}
}
pub fn mime(&self) -> Option<&Mime> {
self.mime.as_ref()
}
pub fn with_mime(mut self, mime: Mime) -> Self {
self.mime = Some(mime);
self
}
pub const fn len(&self) -> Option<usize> {
match &self.inner {
BodyInner::Once(bytes) => Some(bytes.len()),
BodyInner::Reader { length, .. } => *length,
_ => None,
}
}
pub const fn is_empty(&self) -> Option<bool> {
if let Some(len) = self.len() {
if len == 0 {
Some(true)
} else {
Some(false)
}
} else {
None
}
}
pub async fn into_bytes(self) -> Result<Bytes, Error> {
match self.inner {
BodyInner::Once(bytes) => Ok(bytes),
BodyInner::Reader { mut reader, length } => {
let mut vec = Vec::with_capacity(length.unwrap_or_default());
loop {
let data = reader.fill_buf().await?;
if data.is_empty() {
break;
} else {
let len = data.len();
vec.extend_from_slice(data);
reader.as_mut().consume(len);
}
}
Ok(vec.into())
}
BodyInner::HttpBody(body) => {
let mut body = body.into_data_stream();
let first = body.try_next().await?.unwrap_or_default();
let second = body.try_next().await?;
if let Some(second) = second {
let remain_size_hint = body.size_hint();
let mut vec = Vec::with_capacity(
first.len()
+ second.len()
+ remain_size_hint.1.unwrap_or(remain_size_hint.0),
);
vec.extend_from_slice(&first);
vec.extend_from_slice(&second);
while let Some(data) = body.try_next().await? {
vec.extend_from_slice(&data);
}
Ok(vec.into())
} else {
Ok(first)
}
}
BodyInner::Freeze => Err(Error::BodyFrozen),
}
}
pub async fn into_string(self) -> Result<ByteStr, Error> {
Ok(ByteStr::from_utf8(self.into_bytes().await?)?)
}
#[cfg(feature = "std")]
pub fn into_reader(self) -> impl AsyncBufRead + Send {
IntoAsyncRead::new(self)
}
pub fn into_sse(self) -> SseStream {
SseStream::new(self)
}
pub async fn as_bytes(&mut self) -> Result<&[u8], Error> {
self.inner = BodyInner::Once(self.take()?.into_bytes().await?);
match self.inner {
BodyInner::Once(ref bytes) => Ok(bytes),
_ => unreachable!(),
}
}
pub async fn as_str(&mut self) -> Result<&str, Error> {
let data = self.as_bytes().await?;
Ok(core::str::from_utf8(data)?)
}
#[cfg(feature = "json")]
pub async fn into_json<'a, T>(&'a mut self) -> Result<T, Error>
where
T: serde::Deserialize<'a>,
{
Ok(serde_json::from_slice(self.as_bytes().await?)?)
}
#[cfg(feature = "form")]
pub async fn into_form<'a, T>(&'a mut self) -> Result<T, Error>
where
T: serde::Deserialize<'a>,
{
Ok(serde_urlencoded::from_bytes(self.as_bytes().await?)?)
}
pub fn replace(&mut self, body: Body) -> Body {
replace(self, body)
}
pub fn swap(&mut self, body: &mut Body) -> Result<(), BodyFrozen> {
if self.is_frozen() {
Err(BodyFrozen::new())
} else {
swap(self, body);
Ok(())
}
}
pub fn take(&mut self) -> Result<Self, BodyFrozen> {
if self.is_frozen() {
Err(BodyFrozen::new())
} else {
Ok(self.replace(Self::frozen()))
}
}
pub const fn is_frozen(&self) -> bool {
matches!(self.inner, BodyInner::Freeze)
}
pub fn freeze(&mut self) {
self.replace(Self::frozen());
}
}
impl Default for Body {
fn default() -> Self {
Self::empty()
}
}
impl Stream for Body {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.inner {
BodyInner::Once(bytes) => {
if bytes.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(take(bytes))))
}
}
BodyInner::Reader { reader, length } => {
let data = ready!(reader.as_mut().poll_fill_buf(cx))?;
if data.is_empty() {
return Poll::Ready(None);
}
let data = Bytes::copy_from_slice(data);
reader.as_mut().consume(data.len());
if let Some(known_length) = length {
*known_length = known_length.saturating_sub(data.len());
}
Poll::Ready(Some(Ok(data)))
}
BodyInner::HttpBody(stream) => stream
.as_mut()
.poll_frame(cx)
.map_ok(|frame| frame.into_data().unwrap_or_default()),
BodyInner::Freeze => Poll::Ready(Some(Err(Error::BodyFrozen))),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.inner {
BodyInner::Once(bytes) => (bytes.len(), Some(bytes.len())),
BodyInner::Reader { length, .. } => (0, *length),
BodyInner::HttpBody(body) => {
let hint = body.size_hint();
(hint.lower() as usize, hint.upper().map(|u| u as usize))
}
BodyInner::Freeze => (0, None),
}
}
}
impl http_body::Body for Body {
type Data = Bytes;
type Error = Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
self.poll_next(cx)
.map(|opt| opt.map(|result| result.map(http_body::Frame::data)))
.map_err(Error::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::ToString;
use alloc::vec;
use futures_lite::{stream, StreamExt};
#[tokio::test]
async fn basic_body_operations() {
let empty = Body::empty();
assert_eq!(empty.len(), Some(0));
assert_eq!(empty.is_empty(), Some(true));
assert!(!empty.is_frozen());
let text_body = Body::from_bytes("Hello, World!");
assert_eq!(text_body.len(), Some(13));
assert_eq!(text_body.is_empty(), Some(false));
let result = text_body.into_bytes().await.unwrap();
assert_eq!(result.as_ref(), b"Hello, World!");
}
#[tokio::test]
async fn body_freeze_and_take() {
let mut body = Body::from_bytes("test data");
assert!(!body.is_frozen());
let taken = Body::take(&mut body).unwrap();
assert!(body.is_frozen());
let data = taken.into_bytes().await.unwrap();
assert_eq!(data.as_ref(), b"test data");
let result = body.into_bytes().await;
assert!(result.is_err());
}
#[tokio::test]
async fn body_conversions() {
let vec_data = vec![1, 2, 3, 4, 5];
let body = Body::from(vec_data.clone());
let result = body.into_bytes().await.unwrap();
assert_eq!(result.as_ref(), vec_data.as_slice());
let str_data = "string conversion test";
let body = Body::from(str_data);
let result = body.into_string().await.unwrap();
assert_eq!(result.as_str(), str_data);
let string_data = "owned string test".to_string();
let expected = string_data.clone();
let body = Body::from(string_data);
let result = body.into_string().await.unwrap();
assert_eq!(result.as_str(), expected);
let slice_data: &[u8] = &[6, 7, 8, 9, 10];
let body = Body::from(slice_data);
let result = body.into_bytes().await.unwrap();
assert_eq!(result.as_ref(), slice_data);
}
#[tokio::test]
async fn body_stream_yields_bytes() {
let body = Body::from_bytes("streaming test data");
let mut chunks = Vec::new();
let mut stream = body;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
chunks.push(chunk);
}
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].as_ref(), b"streaming test data");
}
#[cfg(feature = "json")]
#[tokio::test]
async fn json_roundtrip() {
use alloc::string::{String, ToString};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct TestData {
message: String,
count: u32,
}
let data = TestData {
message: "JSON test".to_string(),
count: 42,
};
let body = Body::from_json(&data).unwrap();
let json_str = body.into_string().await.unwrap();
assert!(json_str.contains("JSON test"));
assert!(json_str.contains("42"));
let mut body = Body::from_json(&data).unwrap();
let parsed: TestData = body.into_json().await.unwrap();
assert_eq!(parsed, data);
}
#[cfg(feature = "form")]
#[tokio::test]
async fn form_roundtrip() {
use alloc::string::String;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct FormData {
name: String,
age: u32,
}
let data = FormData {
name: "Alice".to_string(),
age: 30,
};
let body = Body::from_form(&data).unwrap();
let form_str = body.into_string().await.unwrap();
assert!(form_str.contains("name=Alice"));
assert!(form_str.contains("age=30"));
let mut body = Body::from_form(&data).unwrap();
let parsed: FormData = body.into_form().await.unwrap();
assert_eq!(parsed, data);
}
#[tokio::test]
async fn reader_does_not_hang() {
use futures_lite::io::{BufReader, Cursor};
let data = "This test ensures the reader doesn't create infinite loops";
let cursor = Cursor::new(data.as_bytes().to_vec());
let reader = BufReader::new(cursor);
let body = Body::from_reader(reader, data.len());
let result = body.into_bytes().await.unwrap();
assert_eq!(result.as_ref(), data.as_bytes());
}
#[tokio::test]
async fn sse_body_creation_sets_mime() {
let events = stream::iter(vec![
Ok::<_, Box<dyn core::error::Error + Send + Sync>>(
crate::sse::Event::from_data("test data").with_id("1"),
),
Ok(crate::sse::Event::from_data("more data").with_id("2")),
]);
let body = Body::from_sse(events);
assert_eq!(
body.mime().as_ref().map(|m| m.as_ref()),
Some("text/event-stream")
);
}
#[tokio::test]
async fn body_as_str_and_bytes() {
let mut body = Body::from_bytes("test string");
let bytes_ref = body.as_bytes().await.unwrap();
assert_eq!(bytes_ref, b"test string");
let bytes_ref2 = body.as_bytes().await.unwrap();
assert_eq!(bytes_ref2, b"test string");
let mut body2 = Body::from_bytes("test string");
let str_ref = body2.as_str().await.unwrap();
assert_eq!(str_ref, "test string");
let mut invalid_body = Body::from_bytes(vec![0xFF, 0xFE, 0xFD]);
let result = invalid_body.as_str().await;
assert!(result.is_err());
}
#[tokio::test]
async fn body_replace_and_swap() {
let mut body = Body::from_bytes("original");
let old_body = body.replace(Body::from_bytes("replacement"));
let new_data = body.into_bytes().await.unwrap();
let old_data = old_body.into_bytes().await.unwrap();
assert_eq!(new_data.as_ref(), b"replacement");
assert_eq!(old_data.as_ref(), b"original");
let mut body1 = Body::from_bytes("first");
let mut body2 = Body::from_bytes("second");
Body::swap(&mut body1, &mut body2).unwrap();
let data1 = body1.into_bytes().await.unwrap();
let data2 = body2.into_bytes().await.unwrap();
assert_eq!(data1.as_ref(), b"second");
assert_eq!(data2.as_ref(), b"first");
let mut frozen_body = Body::frozen();
let mut normal_body = Body::from_bytes("test");
let result = Body::swap(&mut frozen_body, &mut normal_body);
assert!(result.is_err());
}
#[tokio::test]
async fn body_freeze() {
let mut body = Body::from_bytes("test");
assert!(!body.is_frozen());
body.freeze();
assert!(body.is_frozen());
let result = body.into_bytes().await;
assert!(result.is_err());
}
#[tokio::test]
async fn mime_types() {
let empty = Body::empty();
assert!(empty.mime().is_none());
#[cfg(feature = "json")]
{
use serde::Serialize;
#[derive(Serialize)]
struct Data {
val: i32,
}
let body = Body::from_json(&Data { val: 1 }).unwrap();
assert_eq!(body.mime().unwrap().as_ref(), "application/json");
}
#[cfg(feature = "form")]
{
use serde::Serialize;
#[derive(Serialize)]
struct Data {
val: i32,
}
let body = Body::from_form(&Data { val: 1 }).unwrap();
assert_eq!(
body.mime().unwrap().as_ref(),
"application/x-www-form-urlencoded"
);
}
}
#[cfg(all(feature = "fs", feature = "std"))]
#[tokio::test]
async fn file_body_with_mime() {
use std::io::Write;
let dir = std::env::temp_dir();
let file_path = dir.join("test_mime.html");
let mut file = std::fs::File::create(&file_path).unwrap();
file.write_all(b"<html></html>").unwrap();
let body = Body::from_file(&file_path).await.unwrap();
assert_eq!(body.mime().unwrap().as_ref(), "text/html");
let _ = std::fs::remove_file(file_path);
}
}