use std::fmt::{Debug, Formatter};
use std::io::Read;
use std::pin::Pin;
use std::task::Poll::Ready;
use std::task::{Context, Poll};
use bytesbuf::BytesView;
use futures::{Stream, TryStreamExt};
use http_body::{Body, Frame, SizeHint};
use http_body_util::BodyExt;
use pin_project::pin_project;
use thread_aware::ThreadAware;
use crate::constants::DEFAULT_RESPONSE_BUFFER_LIMIT_BYTES;
use crate::error_labels::{LABEL_BODY_CONSUMED, LABEL_BODY_NOT_BUFFERED, LABEL_BODY_SIZE_LIMIT, LABEL_BODY_UTF8_INVALID};
use crate::{HttpError, Result};
mod builder;
pub(crate) mod options;
pub use builder::HttpBodyBuilder;
pub use options::HttpBodyOptions;
pub(crate) mod timeout_body;
#[derive(Debug, ThreadAware)]
#[pin_project]
#[must_use]
pub struct HttpBody {
#[pin]
#[thread_aware(skip)]
kind: Kind,
builder: HttpBodyBuilder,
}
impl HttpBody {
const fn new(kind: Kind, builder: HttpBodyBuilder) -> Self {
Self { kind, builder }
}
pub async fn into_bytes(self) -> Result<BytesView> {
self.into_buffered()
.await?
.into_bytes_no_buffering()
.map_or_else(|| unreachable!("once body is buffered, it must be a view over a byte sequence"), Ok)
}
pub(crate) fn into_bytes_no_buffering(self) -> Option<BytesView> {
match self.kind {
Kind::Bytes(Some(bytes)) => Some(bytes),
Kind::Empty => Some(BytesView::default()),
_ => None,
}
}
#[expect(clippy::cast_possible_truncation, reason = "size_hint is used for capacity, not exact size")]
pub async fn into_text(self) -> Result<String> {
let mut text = String::with_capacity(self.size_hint().lower() as usize);
self.into_bytes()
.await?
.read_to_string(&mut text)
.map_err(|e| HttpError::validation_with_label(format!("body contains invalid UTF-8: {e}"), LABEL_BODY_UTF8_INVALID))?;
Ok(text)
}
pub async fn into_buffered(self) -> Result<Self> {
let builder = self.builder;
match self.kind {
Kind::Bytes(Some(data)) => Ok(builder.bytes(data)),
Kind::Bytes(None) => Err(HttpError::validation_with_label(
"body cannot be buffered because it is already consumed",
LABEL_BODY_CONSUMED,
)),
Kind::Empty => Ok(builder.empty()),
Kind::Body(b, options) => {
let limit = options.buffer_limit;
let data = collect_with_limit(b.into_data_stream(), limit).await?;
Ok(builder.bytes(data))
}
}
}
#[cfg(any(feature = "json", test))]
pub async fn into_json_owned<T: serde_core::de::DeserializeOwned>(self) -> Result<T> {
let json = self.into_json().await?.read_owned()?;
Ok(json)
}
#[cfg(any(feature = "json", test))]
pub async fn into_json<'a, T: serde_core::de::Deserialize<'a>>(self) -> Result<crate::json::Json<T>> {
Ok(crate::json::Json::<T>::new(self.into_bytes().await?))
}
#[must_use]
pub fn content_length(&self) -> Option<u64> {
match &self.kind {
Kind::Bytes(Some(bytes)) => Some(bytes.len() as u64),
Kind::Bytes(None) | Kind::Empty => Some(0),
Kind::Body(b, _) => b.size_hint().exact(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.content_length() == Some(0)
}
#[must_use]
pub fn try_clone(&self) -> Option<Self> {
match &self.kind {
Kind::Bytes(Some(bytes)) => Some(self.builder.bytes(bytes.clone())),
Kind::Empty => Some(self.builder.empty()),
Kind::Body(..) | Kind::Bytes(None) => None,
}
}
pub fn into_stream(self) -> impl Stream<Item = Result<BytesView>> {
self.into_data_stream()
}
}
impl TryFrom<HttpBody> for BytesView {
type Error = HttpError;
fn try_from(value: HttpBody) -> std::result::Result<Self, Self::Error> {
match value.kind {
Kind::Bytes(Some(bytes)) => Ok(bytes),
Kind::Empty => Ok(Self::default()),
_ => Err(HttpError::validation_with_label(
"body cannot be converted to byte sequence because it is not buffered",
LABEL_BODY_NOT_BUFFERED,
)),
}
}
}
impl Body for HttpBody {
type Data = BytesView;
type Error = HttpError;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>>>> {
let this = self.project();
match this.kind.project() {
BodyInnerProj::Bytes(bytes) => bytes
.take()
.map_or_else(|| Ready(None), |bytes| Ready((!bytes.is_empty()).then(|| Ok(Frame::data(bytes))))),
BodyInnerProj::Empty => Ready(None),
BodyInnerProj::Body(body, _) => body.as_mut().poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match &self.kind {
Kind::Bytes(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
Kind::Bytes(None) | Kind::Empty => SizeHint::with_exact(0),
Kind::Body(b, _) => b.size_hint(),
}
}
fn is_end_stream(&self) -> bool {
match &self.kind {
Kind::Bytes(Some(x)) => x.is_empty(),
Kind::Bytes(None) | Kind::Empty => true,
Kind::Body(b, _) => b.is_end_stream(),
}
}
}
#[expect(
clippy::large_enum_variant,
reason = "BytesView is intentionally large, though future optimizations may decrease size"
)]
#[pin_project(project = BodyInnerProj)]
enum Kind {
Bytes(Option<BytesView>),
Empty,
Body(Pin<Box<dyn Body<Data = BytesView, Error = HttpError> + Send>>, HttpBodyOptions),
}
impl Debug for Kind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bytes(_) => f.debug_struct("Bytes").finish(),
Self::Empty => f.debug_struct("Empty").finish(),
Self::Body(_, _) => f.debug_struct("Body").finish(),
}
}
}
async fn collect_with_limit(mut data: impl Stream<Item = Result<BytesView>> + Send + Unpin, limit: Option<usize>) -> Result<BytesView> {
let mut total_size = 0_usize;
let mut fragments = Vec::new();
let limit = limit.unwrap_or(DEFAULT_RESPONSE_BUFFER_LIMIT_BYTES);
while let Some(bytes) = data.try_next().await? {
total_size = check_size_limit(total_size, bytes.len(), limit)?;
fragments.push(bytes);
}
Ok(BytesView::from_views(fragments))
}
fn check_size_limit(current_size: usize, additional: usize, limit: usize) -> Result<usize> {
let total = current_size
.checked_add(additional)
.ok_or_else(|| HttpError::validation_with_label(format!("body size exceeds the limit of {limit} bytes"), LABEL_BODY_SIZE_LIMIT))?;
if total > limit {
return Err(HttpError::validation_with_label(
format!("body size exceeds the limit of {limit} bytes"),
LABEL_BODY_SIZE_LIMIT,
));
}
Ok(total)
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::pin::pin;
use std::task::Waker;
use bytes::Bytes;
use bytesbuf::mem::GlobalPool;
use futures::executor::block_on;
use http_body_util::StreamBody;
use ohno::{ErrorExt, Labeled};
use serde::{Deserialize, Serialize};
use static_assertions::assert_impl_all;
use super::*;
use crate::testing::create_stream_body;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Model {
id: u32,
name: String,
}
#[test]
fn assert_send_and_sync() {
assert_impl_all!(super::HttpBody: Send, Debug, ThreadAware);
}
#[test]
fn from_and_into_json_ok() {
let data = Model {
id: 1,
name: "name".to_string(),
};
let body = HttpBodyBuilder::new_fake().json(&data).unwrap();
assert_eq!(Some(22), body.content_length());
let result: Model = block_on(body.into_json_owned()).unwrap();
assert_eq!(data, result);
}
#[test]
fn json_deserialization_error() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("{invalid json}");
let result: Result<Model> = block_on(body.into_json_owned());
result.unwrap_err();
}
#[test]
fn into_json_with_cow_strings() {
use std::borrow::Cow;
#[derive(Debug, Deserialize, PartialEq)]
struct User<'a> {
id: u32,
#[serde(borrow)]
name: Cow<'a, str>,
#[serde(borrow)]
email: Cow<'a, str>,
is_active: bool,
}
let json_data = r#"{"id": 42, "name": "Alice Smith", "email": "alice@example.com", "is_active": true}"#;
let builder = HttpBodyBuilder::new_fake();
let body = builder.text(json_data);
let mut json_result = block_on(body.into_json::<User>()).unwrap();
let user = json_result.read().unwrap();
assert_eq!(user.id, 42);
assert_eq!(user.name, "Alice Smith");
assert_eq!(user.email, "alice@example.com");
assert!(user.is_active);
assert!(matches!(user.name, Cow::Borrowed(_)));
assert!(matches!(user.email, Cow::Borrowed(_)));
}
#[test]
fn try_clone_text_body() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello");
let cloned = body.try_clone().unwrap();
assert_eq!(block_on(body.into_text()).unwrap(), block_on(cloned.into_text()).unwrap());
}
#[test]
fn try_clone_empty_body() {
let builder = HttpBodyBuilder::new_fake();
let empty = builder.empty();
let cloned_empty = empty.try_clone().unwrap();
assert_eq!(
block_on(empty.into_bytes()).unwrap().len(),
block_on(cloned_empty.into_bytes()).unwrap().len()
);
}
#[test]
fn custom_body_is_not_cloneable() {
let body = HttpBodyBuilder::new_fake().body(http_body_util::Empty::new(), &HttpBodyOptions::default());
assert!(body.try_clone().is_none());
let body = block_on(body.into_buffered()).unwrap();
assert_eq!(Some(0), body.content_length());
}
#[test]
fn into_buffered_already_consumed_body_returns_error() {
let mut body = HttpBodyBuilder::new_fake().text("hello");
let _frame = block_on(body.frame());
let err = block_on(body.into_buffered()).unwrap_err();
assert_eq!(err.label(), "body_consumed");
assert!(
err.to_string().contains("body cannot be buffered because it is already consumed"),
"expected consumed body error, got: {err}"
);
}
#[test]
fn body_error_propagation() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.body(
StreamBody::new(futures::stream::once(async { Err(HttpError::validation("test error")) })),
&HttpBodyOptions::default(),
);
let error = block_on(body.into_buffered()).unwrap_err();
assert_eq!(error.message(), "test error");
}
#[test]
fn collect_with_limit_success() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(b"test data", &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100))).unwrap();
assert_eq!(result, b"test data");
}
#[test]
fn collect_with_limit_exceeds() {
let memory = GlobalPool::new();
let data1 = BytesView::copied_from_slice(b"test data 1", &memory);
let data2 = BytesView::copied_from_slice(b"test data 2", &memory);
let stream = futures::stream::iter(vec![Ok(data1), Ok(data2)]);
let result = block_on(collect_with_limit(stream, Some(5)));
assert!(result.is_err());
let err = result.err().unwrap();
assert_eq!(err.label(), "body_size_limit");
assert!(err.to_string().contains("body size exceeds the limit"));
}
#[test]
fn collect_with_limit_stream_error() {
let memory = GlobalPool::new();
let error_stream = futures::stream::iter(vec![
Ok(BytesView::copied_from_slice(b"valid data", &memory)),
Err(HttpError::validation("stream error")),
]);
let result = block_on(collect_with_limit(error_stream, Some(1000)));
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("stream error"));
}
#[test]
fn collect_with_custom_limit() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 1000], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(1024))).unwrap();
assert_eq!(result.len(), 1000);
}
#[test]
fn into_text_invalid_utf8() {
let builder = HttpBodyBuilder::new_fake();
let invalid_utf8 = vec![0xFF, 0xFE, 0xFD];
let body = builder.slice(&invalid_utf8);
let error = block_on(body.into_text()).unwrap_err();
assert_eq!(error.label(), "body_utf8_invalid");
assert!(error.to_string().contains("body contains invalid UTF-8"));
}
#[test]
fn collect_with_limit_default() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(b"test", &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, None));
result.unwrap();
}
#[test]
fn collect_with_limit_empty_stream() {
let stream = futures::stream::iter(vec![] as Vec<Result<BytesView>>);
let result = block_on(collect_with_limit(stream, Some(100))).unwrap();
assert_eq!(result.len(), 0);
}
#[test]
fn try_clone_custom_body_fails() {
let builder = HttpBodyBuilder::new_fake();
let custom_body = builder.body(http_body_util::Empty::new(), &HttpBodyOptions::default());
assert!(custom_body.try_clone().is_none());
}
#[test]
fn into_bytes_view_empty_body_ok() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 0);
}
#[test]
fn into_bytes_view_has_data_ok() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello");
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 5);
}
#[test]
fn into_bytes_custom_body_fails() {
let builder = HttpBodyBuilder::new_fake();
let custom_body = builder.body(http_body_util::Empty::new(), &HttpBodyOptions::default());
BytesView::try_from(custom_body).unwrap_err();
}
#[test]
fn size_hint_bytes_view() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("test");
let size_hint = body.size_hint();
assert_eq!(size_hint.lower(), 4);
assert_eq!(size_hint.upper(), Some(4));
}
#[test]
fn size_hint_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
let size_hint = body.size_hint();
assert_eq!(size_hint.lower(), 0);
assert_eq!(size_hint.upper(), Some(0));
}
#[test]
fn collect_with_limit_at_boundary() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 100], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100)));
result.unwrap();
}
#[test]
fn collect_with_limit_exceeds_boundary() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 101], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100)));
result.unwrap_err();
}
#[test]
fn check_size_limit_overflow_returns_error() {
let result = check_size_limit(usize::MAX, 1, usize::MAX);
let err = result.unwrap_err();
assert_eq!(err.label(), "body_size_limit");
assert!(
err.to_string().contains("body size exceeds the limit"),
"expected body size error, got: {err}"
);
}
#[test]
fn debug_kind() {
let debug_str = format!("{:?}", Kind::Bytes(Some(BytesView::default())));
assert_eq!("Bytes", debug_str);
let debug_str = format!("{:?}", Kind::Empty);
assert_eq!("Empty", debug_str);
let debug_str = format!("{:?}", Kind::Empty);
assert_eq!("Empty", debug_str);
let builder = HttpBodyBuilder::new_fake();
let stream = futures::stream::iter(Vec::<Result<BytesView>>::new());
let body = builder.stream(stream, &HttpBodyOptions::default());
let debug_str = format!("{body:?}");
assert!(debug_str.contains("Body"), "{debug_str}");
}
#[test]
fn http_body_poll_frame() {
let mut http_body = pin!(HttpBodyBuilder::new_fake().text("test body"));
let mut cx = Context::from_waker(Waker::noop());
let res = http_body.as_mut().poll_frame(&mut cx);
assert!(matches!(res, Poll::Ready(Some(Ok(_)))));
let res = http_body.as_mut().poll_frame(&mut cx);
assert!(matches!(res, Poll::Ready(None)));
}
#[test]
fn poll_frame_empty_bytes_view_returns_none() {
let mut body = pin!(HttpBodyBuilder::new_fake().empty());
let mut cx = Context::from_waker(Waker::noop());
let result = body.as_mut().poll_frame(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
}
#[test]
fn is_end_stream_bytes_view_non_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("non-empty");
assert!(!body.is_end_stream());
}
#[test]
fn is_end_stream_bytes_view_empty() {
let builder = HttpBodyBuilder::new_fake();
let zero_bytes = BytesView::new();
let body = builder.bytes(zero_bytes);
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_bytes_view_none() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("test");
let mut pinned = pin!(body);
let mut cx = Context::from_waker(Waker::noop());
let _ = pinned.as_mut().poll_frame(&mut cx);
assert!(pinned.is_end_stream());
}
#[test]
fn is_end_stream_empty_body() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_custom_body_empty() {
let builder = HttpBodyBuilder::new_fake();
let custom = http_body_util::Empty::new();
let body = builder.body(custom, &HttpBodyOptions::default());
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_custom_body_with_data() {
let builder = HttpBodyBuilder::new_fake();
let data = Bytes::from_static(b"test data");
let custom = http_body_util::Full::new(data.into());
let body = builder.body(custom, &HttpBodyOptions::default());
assert!(!body.is_end_stream());
}
#[test]
fn external_body_into_bytes() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"raw bytes", &HttpBodyOptions::default());
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes, b"raw bytes");
}
#[test]
fn external_body_empty_into_bytes() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"", &HttpBodyOptions::default());
let bytes = block_on(body.into_bytes()).unwrap();
assert!(bytes.is_empty());
}
#[test]
fn external_body_into_json_owned() {
let builder = HttpBodyBuilder::new_fake();
let json_bytes = br#"{"id":42,"name":"alice"}"#;
let body = create_stream_body(&builder, json_bytes, &HttpBodyOptions::default());
let model: Model = block_on(body.into_json_owned()).unwrap();
assert_eq!(
model,
Model {
id: 42,
name: "alice".to_string()
}
);
}
#[test]
fn external_body_try_clone_returns_none() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"no clone", &HttpBodyOptions::default());
assert!(body.try_clone().is_none());
}
#[test]
fn external_body_into_bytes_view_fails() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"not buffered", &HttpBodyOptions::default());
let err = BytesView::try_from(body).unwrap_err();
assert_eq!(err.label(), "body_not_buffered");
}
#[test]
fn external_body_into_bytes_no_buffering_returns_none() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"data", &HttpBodyOptions::default());
assert!(body.into_bytes_no_buffering().is_none());
}
#[test]
fn external_body_buffered_then_clone() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"clone me", &HttpBodyOptions::default());
let buffered = block_on(body.into_buffered()).unwrap();
let cloned = buffered.try_clone().unwrap();
assert_eq!(block_on(buffered.into_text()).unwrap(), "clone me");
assert_eq!(block_on(cloned.into_text()).unwrap(), "clone me");
}
#[test]
fn external_body_with_buffer_limit_exceeded() {
let builder = HttpBodyBuilder::new_fake().with_options(HttpBodyOptions::default().buffer_limit(5));
let body = create_stream_body(&builder, b"this exceeds the limit", &HttpBodyOptions::default());
let err = block_on(body.into_buffered()).unwrap_err();
assert_eq!(err.label(), "body_size_limit");
assert!(err.to_string().contains("body size exceeds the limit"));
}
#[test]
fn external_body_with_buffer_limit_ok() {
let builder = HttpBodyBuilder::new_fake().with_options(HttpBodyOptions::default().buffer_limit(1024));
let body = create_stream_body(&builder, b"fits", &HttpBodyOptions::default());
let text = block_on(body.into_text()).unwrap();
assert_eq!(text, "fits");
}
#[test]
fn external_body_is_end_stream_with_data() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"data", &HttpBodyOptions::default());
assert!(!body.is_end_stream());
}
#[test]
fn external_body_poll_frame_yields_correct_data() {
let builder = HttpBodyBuilder::new_fake();
let mut body = pin!(create_stream_body(&builder, b"exact", &HttpBodyOptions::default()));
let mut cx = Context::from_waker(Waker::noop());
if let Poll::Ready(Some(Ok(frame))) = body.as_mut().poll_frame(&mut cx) {
let data = frame.into_data().unwrap();
assert_eq!(data, b"exact");
} else {
panic!("expected a data frame");
}
}
#[test]
fn is_empty_for_empty_body() {
let builder = HttpBodyBuilder::new_fake();
assert!(builder.empty().is_empty());
}
#[test]
fn is_empty_for_text_body() {
let builder = HttpBodyBuilder::new_fake();
assert!(!builder.text("hello").is_empty());
}
#[test]
fn is_empty_for_zero_length_bytes() {
let builder = HttpBodyBuilder::new_fake();
assert!(builder.bytes(BytesView::new()).is_empty());
}
#[test]
fn into_stream_produces_chunks() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("stream test");
let chunks: Vec<_> = block_on(body.into_stream().try_collect()).unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], b"stream test");
}
}