use std::fmt::{Debug, Formatter};
use std::io::Read;
use std::pin::Pin;
use std::task::Poll::Ready;
use std::task::{Context, Poll};
use bytesbuf::mem::{GlobalPool, HasMemory, Memory, MemoryShared, OpaqueMemory};
use bytesbuf::{BytesBuf, BytesView};
use futures::{Stream, TryStreamExt};
use http_body::{Body, Frame, SizeHint};
use http_body_util::BodyExt;
use pin_project::pin_project;
use thread_aware::{ThreadAware, Unaware};
use crate::constants::DEFAULT_RESPONSE_BUFFER_LIMIT_BYTES;
#[cfg(any(feature = "json", test))]
use crate::json::JsonError;
use crate::{HttpError, Result};
#[derive(Debug, ThreadAware)]
#[pin_project]
#[must_use]
pub struct HttpBody {
#[pin]
#[thread_aware(skip)]
kind: Kind,
builder: HttpBodyBuilder,
}
#[derive(Debug, Clone, ThreadAware)]
pub struct HttpBodyBuilder {
memory: MemoryWrapper,
response_buffer_limit: Option<usize>,
}
impl HttpBody {
const fn new(kind: Kind, builder: HttpBodyBuilder) -> Self {
Self { kind, builder }
}
pub async fn into_bytes(self) -> Result<BytesView> {
self.into_buffered()
.await?
.into_bytes_no_buffering()
.map_or_else(|| unreachable!("once body is buffered, it must be a view over a byte sequence"), Ok)
}
pub(crate) fn into_bytes_no_buffering(self) -> Option<BytesView> {
match self.kind {
Kind::Bytes(Some(bytes)) => Some(bytes),
Kind::Empty => Some(BytesView::default()),
_ => None,
}
}
#[expect(clippy::cast_possible_truncation, reason = "size_hint is used for capacity, not exact size")]
pub async fn into_text(self) -> Result<String> {
let mut text = String::with_capacity(self.size_hint().lower() as usize);
self.into_bytes()
.await?
.read_to_string(&mut text)
.map_err(|e| HttpError::validation(format!("body contains invalid UTF-8: {e}")))?;
Ok(text)
}
pub async fn into_buffered(self) -> Result<Self> {
let builder = self.builder;
let limit = builder.response_buffer_limit;
match self.kind {
Kind::Bytes(Some(data)) => Ok(builder.bytes(data)),
Kind::Bytes(None) => Err(HttpError::validation("body cannot be buffered because it is already consumed")),
Kind::Empty => Ok(builder.empty()),
Kind::Body(b) => {
let data = collect_with_limit(b.into_data_stream(), limit).await?;
Ok(builder.bytes(data))
}
}
}
#[cfg(any(feature = "json", test))]
pub async fn into_json_owned<T: serde_core::de::DeserializeOwned>(self) -> Result<T> {
let json = self.into_json().await?.read_owned()?;
Ok(json)
}
#[cfg(any(feature = "json", test))]
pub async fn into_json<'a, T: serde_core::de::Deserialize<'a>>(self) -> Result<crate::json::Json<T>> {
Ok(crate::json::Json::<T>::new(self.into_bytes().await?))
}
#[must_use]
pub fn content_length(&self) -> Option<u64> {
match &self.kind {
Kind::Bytes(Some(bytes)) => Some(bytes.len() as u64),
Kind::Bytes(None) | Kind::Empty => Some(0),
Kind::Body(b) => b.size_hint().exact(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.content_length() == Some(0)
}
#[must_use]
pub fn try_clone(&self) -> Option<Self> {
match &self.kind {
Kind::Bytes(Some(bytes)) => Some(self.builder.bytes(bytes.clone())),
Kind::Empty => Some(self.builder.empty()),
Kind::Body(_) | Kind::Bytes(None) => None,
}
}
pub fn into_stream(self) -> impl Stream<Item = Result<BytesView>> {
self.into_data_stream()
}
}
impl TryFrom<HttpBody> for BytesView {
type Error = HttpError;
fn try_from(value: HttpBody) -> std::result::Result<Self, Self::Error> {
match value.kind {
Kind::Bytes(Some(bytes)) => Ok(bytes),
Kind::Empty => Ok(Self::default()),
_ => Err(HttpError::validation(
"body cannot be converted to byte sequence because it is not buffered",
)),
}
}
}
impl Body for HttpBody {
type Data = BytesView;
type Error = HttpError;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>>>> {
let this = self.project();
match this.kind.project() {
BodyInnerProj::Bytes(bytes) => bytes
.take()
.map_or_else(|| Ready(None), |bytes| Ready((!bytes.is_empty()).then(|| Ok(Frame::data(bytes))))),
BodyInnerProj::Empty => Ready(None),
BodyInnerProj::Body(body) => body.as_mut().poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match &self.kind {
Kind::Bytes(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
Kind::Bytes(None) | Kind::Empty => SizeHint::with_exact(0),
Kind::Body(b) => b.size_hint(),
}
}
fn is_end_stream(&self) -> bool {
match &self.kind {
Kind::Bytes(Some(x)) => x.is_empty(),
Kind::Bytes(None) | Kind::Empty => true,
Kind::Body(b) => b.is_end_stream(),
}
}
}
impl HttpBodyBuilder {
#[cfg(any(feature = "test-util", test))]
#[must_use]
pub fn new_fake() -> Self {
Self::new(GlobalPool::new())
}
#[must_use]
pub fn new(memory: GlobalPool) -> Self {
Self {
memory: MemoryWrapper::Global(memory),
response_buffer_limit: None,
}
}
#[must_use]
pub fn with_custom_memory(memory: impl MemoryShared) -> Self {
Self {
memory: MemoryWrapper::Opaque(Unaware(OpaqueMemory::new(memory))),
response_buffer_limit: None,
}
}
#[must_use]
pub const fn with_response_buffer_limit(mut self, limit: Option<usize>) -> Self {
self.response_buffer_limit = limit;
self
}
pub fn external<B>(&self, body: B) -> HttpBody
where
B: Body<Data = BytesView, Error: Into<HttpError>> + Send + 'static,
{
HttpBody::new(Kind::Body(Box::pin(body.map_err(Into::into))), self.clone())
}
pub fn stream<S>(&self, stream: S) -> HttpBody
where
S: Stream<Item = Result<BytesView>> + Send + 'static,
{
use http_body_util::StreamBody;
let framed = stream.map_ok(Frame::data);
self.external(StreamBody::new(framed))
}
pub fn text(&self, str: impl AsRef<str>) -> HttpBody {
self.slice(str.as_ref().as_bytes())
}
pub fn slice(&self, data: impl AsRef<[u8]>) -> HttpBody {
let mut builder = self.reserve(data.as_ref().len());
builder.put_slice(data.as_ref());
self.bytes(builder.consume_all())
}
pub fn bytes(&self, b: impl Into<BytesView>) -> HttpBody {
HttpBody::new(Kind::Bytes(Some(b.into())), self.clone())
}
pub fn empty(&self) -> HttpBody {
HttpBody::new(Kind::Empty, self.clone())
}
#[cfg(any(feature = "json", test))]
pub fn json<T: serde_core::ser::Serialize>(&self, data: &T) -> std::result::Result<HttpBody, JsonError> {
let builder = BytesBuf::new();
let mut writer = builder.into_writer(&self);
serde_json::to_writer(&mut writer, data).map_err(JsonError::serialization)?;
Ok(self.bytes(writer.into_inner().consume_all()))
}
}
impl Memory for HttpBodyBuilder {
fn reserve(&self, min_bytes: usize) -> BytesBuf {
self.memory.reserve(min_bytes)
}
}
impl HasMemory for HttpBodyBuilder {
fn memory(&self) -> impl MemoryShared {
self.memory.clone()
}
}
#[expect(
clippy::large_enum_variant,
reason = "BytesView is intentionally large, though future optimizations may decrease size"
)]
#[pin_project(project = BodyInnerProj)]
enum Kind {
Bytes(Option<BytesView>),
Empty,
Body(Pin<Box<dyn Body<Data = BytesView, Error = HttpError> + Send>>),
}
impl Debug for Kind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bytes(_) => f.debug_struct("Bytes").finish(),
Self::Empty => f.debug_struct("Empty").finish(),
Self::Body(_) => f.debug_struct("Body").finish(),
}
}
}
async fn collect_with_limit(mut data: impl Stream<Item = Result<BytesView>> + Send + Unpin, limit: Option<usize>) -> Result<BytesView> {
let mut total_size = 0_usize;
let mut fragments = Vec::new();
let limit = limit.unwrap_or(DEFAULT_RESPONSE_BUFFER_LIMIT_BYTES);
while let Some(bytes) = data.try_next().await? {
total_size = check_size_limit(total_size, bytes.len(), limit)?;
fragments.push(bytes);
}
Ok(BytesView::from_views(fragments))
}
fn check_size_limit(current_size: usize, additional: usize, limit: usize) -> Result<usize> {
let total = current_size
.checked_add(additional)
.ok_or_else(|| HttpError::validation(format!("body size exceeds the limit of {limit} bytes")))?;
if total > limit {
return Err(HttpError::validation(format!("body size exceeds the limit of {limit} bytes")));
}
Ok(total)
}
#[derive(Debug, Clone, ThreadAware)]
enum MemoryWrapper {
Global(GlobalPool),
Opaque(Unaware<OpaqueMemory>),
}
impl Memory for MemoryWrapper {
fn reserve(&self, min_bytes: usize) -> BytesBuf {
match self {
Self::Global(pool) => pool.reserve(min_bytes),
Self::Opaque(memory) => memory.reserve(min_bytes),
}
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::pin::pin;
use std::task::Waker;
use bytes::Bytes;
use bytesbuf::mem::testing::TransparentMemory;
use futures::executor::block_on;
use http_body_util::StreamBody;
use ohno::ErrorExt;
use serde::{Deserialize, Serialize};
use static_assertions::assert_impl_all;
use super::*;
use crate::testing::{create_stream_body, create_stream_body_from_chunks};
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Model {
id: u32,
name: String,
}
#[test]
fn assert_send_and_sync() {
assert_impl_all!(super::HttpBody: Send, Debug, ThreadAware);
}
#[test]
fn assert_createhttpbody_is_send_and_sync() {
assert_impl_all!(super::HttpBodyBuilder: Send, Sync, Debug);
}
#[test]
fn from_and_into_json_ok() {
let data = Model {
id: 1,
name: "name".to_string(),
};
let body = HttpBodyBuilder::new_fake().json(&data).unwrap();
assert_eq!(Some(22), body.content_length());
let result: Model = block_on(body.into_json_owned()).unwrap();
assert_eq!(data, result);
}
#[test]
fn json_deserialization_error() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("{invalid json}");
let result: Result<Model> = block_on(body.into_json_owned());
result.unwrap_err();
}
#[test]
fn into_json_with_cow_strings() {
use std::borrow::Cow;
#[derive(Debug, Deserialize, PartialEq)]
struct User<'a> {
id: u32,
#[serde(borrow)]
name: Cow<'a, str>,
#[serde(borrow)]
email: Cow<'a, str>,
is_active: bool,
}
let json_data = r#"{"id": 42, "name": "Alice Smith", "email": "alice@example.com", "is_active": true}"#;
let builder = HttpBodyBuilder::new_fake();
let body = builder.text(json_data);
let mut json_result = block_on(body.into_json::<User>()).unwrap();
let user = json_result.read().unwrap();
assert_eq!(user.id, 42);
assert_eq!(user.name, "Alice Smith");
assert_eq!(user.email, "alice@example.com");
assert!(user.is_active);
assert!(matches!(user.name, Cow::Borrowed(_)));
assert!(matches!(user.email, Cow::Borrowed(_)));
}
#[test]
fn try_clone_text_body() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello");
let cloned = body.try_clone().unwrap();
assert_eq!(block_on(body.into_text()).unwrap(), block_on(cloned.into_text()).unwrap());
}
#[test]
fn try_clone_empty_body() {
let builder = HttpBodyBuilder::new_fake();
let empty = builder.empty();
let cloned_empty = empty.try_clone().unwrap();
assert_eq!(
block_on(empty.into_bytes()).unwrap().len(),
block_on(cloned_empty.into_bytes()).unwrap().len()
);
}
#[test]
fn custom_body_is_not_cloneable() {
let body = HttpBodyBuilder::new_fake().external(http_body_util::Empty::new());
assert!(body.try_clone().is_none());
let body = block_on(body.into_buffered()).unwrap();
assert_eq!(Some(0), body.content_length());
}
#[test]
fn into_buffered_already_consumed_body_returns_error() {
let mut body = HttpBodyBuilder::new_fake().text("hello");
let _frame = block_on(body.frame());
let err = block_on(body.into_buffered()).unwrap_err();
assert!(
err.to_string().contains("body cannot be buffered because it is already consumed"),
"expected consumed body error, got: {err}"
);
}
#[test]
fn body_error_propagation() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.external(StreamBody::new(futures::stream::once(async {
Err(HttpError::validation("test error"))
})));
let error = block_on(body.into_buffered()).unwrap_err();
assert_eq!(error.message(), "test error");
}
#[test]
fn collect_with_limit_success() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(b"test data", &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100))).unwrap();
assert_eq!(result, b"test data");
}
#[test]
fn collect_with_limit_exceeds() {
let memory = GlobalPool::new();
let data1 = BytesView::copied_from_slice(b"test data 1", &memory);
let data2 = BytesView::copied_from_slice(b"test data 2", &memory);
let stream = futures::stream::iter(vec![Ok(data1), Ok(data2)]);
let result = block_on(collect_with_limit(stream, Some(5)));
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("body size exceeds the limit"));
}
#[test]
fn collect_with_limit_stream_error() {
let memory = GlobalPool::new();
let error_stream = futures::stream::iter(vec![
Ok(BytesView::copied_from_slice(b"valid data", &memory)),
Err(HttpError::validation("stream error")),
]);
let result = block_on(collect_with_limit(error_stream, Some(1000)));
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("stream error"));
}
#[test]
fn createhttpbody_with_custom_memory_provider() {
let memory = GlobalPool::new();
let builder = HttpBodyBuilder::new(memory);
let body = builder.text("test");
assert_eq!(body.content_length(), Some(4));
}
#[test]
fn slice_creation() {
let builder = HttpBodyBuilder::new_fake();
let data = [1, 2, 3, 4];
let body = builder.slice(data);
assert_eq!(body.content_length(), Some(4));
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes, &[1, 2, 3, 4]);
}
#[test]
fn empty_body_creation() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
assert_eq!(body.content_length(), Some(0));
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes.len(), 0);
}
#[test]
fn collect_with_custom_limit() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 1000], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(1024))).unwrap();
assert_eq!(result.len(), 1000);
}
#[test]
fn into_text_invalid_utf8() {
let builder = HttpBodyBuilder::new_fake();
let invalid_utf8 = vec![0xFF, 0xFE, 0xFD];
let body = builder.slice(&invalid_utf8);
let error = block_on(body.into_text()).unwrap_err();
assert!(error.to_string().contains("body contains invalid UTF-8"));
}
#[test]
fn response_buffer_limit_with_some() {
let builder = HttpBodyBuilder::new_fake().with_response_buffer_limit(Some(1024));
assert_eq!(builder.response_buffer_limit, Some(1024));
}
#[test]
fn response_buffer_limit_with_none() {
let builder = HttpBodyBuilder::new_fake().with_response_buffer_limit(None);
assert_eq!(builder.response_buffer_limit, None);
}
#[test]
fn content_length_empty_body() {
let builder = HttpBodyBuilder::new_fake();
let empty = builder.empty();
assert_eq!(empty.content_length(), Some(0));
}
#[test]
fn content_length_text_body() {
let builder = HttpBodyBuilder::new_fake();
let text = builder.text("hello");
assert_eq!(text.content_length(), Some(5));
}
#[test]
fn content_length_slice_body() {
let builder = HttpBodyBuilder::new_fake();
let slice = builder.slice([1, 2, 3]);
assert_eq!(slice.content_length(), Some(3));
}
#[test]
fn content_length_bytes_body() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.bytes(BytesView::new());
assert_eq!(body.content_length(), Some(0));
}
#[test]
fn collect_with_limit_default() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(b"test", &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, None));
result.unwrap();
}
#[test]
fn collect_with_limit_empty_stream() {
let stream = futures::stream::iter(vec![] as Vec<Result<BytesView>>);
let result = block_on(collect_with_limit(stream, Some(100))).unwrap();
assert_eq!(result.len(), 0);
}
#[test]
fn try_clone_custom_body_fails() {
let builder = HttpBodyBuilder::new_fake();
let custom_body = builder.external(http_body_util::Empty::new());
assert!(custom_body.try_clone().is_none());
}
#[test]
fn into_bytes_view_empty_body_ok() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 0);
}
#[test]
fn into_bytes_view_has_data_ok() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello");
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 5);
}
#[test]
fn into_bytes_view_has_data_with_custom_memory() {
let builder = HttpBodyBuilder::with_custom_memory(TransparentMemory::new());
let body = builder.text("hello");
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 5);
}
#[test]
fn into_bytes_custom_body_fails() {
let builder = HttpBodyBuilder::new_fake();
let custom_body = builder.external(http_body_util::Empty::new());
BytesView::try_from(custom_body).unwrap_err();
}
#[test]
fn text_from_str() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello world");
assert_eq!(body.content_length(), Some(11));
let result = block_on(body.into_text()).unwrap();
assert_eq!(result, "hello world");
}
#[test]
fn text_from_string() {
let builder = HttpBodyBuilder::new_fake();
let text = String::from("hello world");
let body = builder.text(text);
assert_eq!(body.content_length(), Some(11));
let result = block_on(body.into_text()).unwrap();
assert_eq!(result, "hello world");
}
#[test]
fn bytes_view() {
let memory = GlobalPool::new();
let builder = HttpBodyBuilder::new_fake();
let bytes = BytesView::copied_from_slice(b"test", &memory);
let body = builder.bytes(bytes);
assert_eq!(body.content_length(), Some(4));
}
#[test]
fn into_bytes_view() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.bytes(Bytes::from_static(b"test"));
assert_eq!(body.content_length(), Some(4));
}
#[test]
fn size_hint_bytes_view() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("test");
let size_hint = body.size_hint();
assert_eq!(size_hint.lower(), 4);
assert_eq!(size_hint.upper(), Some(4));
}
#[test]
fn size_hint_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
let size_hint = body.size_hint();
assert_eq!(size_hint.lower(), 0);
assert_eq!(size_hint.upper(), Some(0));
}
#[test]
fn collect_with_limit_at_boundary() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 100], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100)));
result.unwrap();
}
#[test]
fn collect_with_limit_exceeds_boundary() {
let memory = GlobalPool::new();
let data = BytesView::copied_from_slice(&[0u8; 101], &memory);
let stream = futures::stream::iter(vec![Ok(data)]);
let result = block_on(collect_with_limit(stream, Some(100)));
result.unwrap_err();
}
#[test]
fn check_size_limit_overflow_returns_error() {
let result = check_size_limit(usize::MAX, 1, usize::MAX);
let err = result.unwrap_err();
assert!(
err.to_string().contains("body size exceeds the limit"),
"expected body size error, got: {err}"
);
}
#[test]
fn debug_kind() {
let debug_str = format!("{:?}", Kind::Bytes(Some(BytesView::default())));
assert_eq!("Bytes", debug_str);
let debug_str = format!("{:?}", Kind::Empty);
assert_eq!("Empty", debug_str);
let debug_str = format!("{:?}", Kind::Empty);
assert_eq!("Empty", debug_str);
let builder = HttpBodyBuilder::new_fake();
let stream = futures::stream::iter(Vec::<Result<BytesView>>::new());
let body = builder.stream(stream);
let debug_str = format!("{body:?}");
assert!(debug_str.contains("Body"), "{debug_str}");
}
#[test]
fn http_body_poll_frame() {
let mut http_body = pin!(HttpBodyBuilder::new_fake().text("test body"));
let mut cx = Context::from_waker(Waker::noop());
let res = http_body.as_mut().poll_frame(&mut cx);
assert!(matches!(res, Poll::Ready(Some(Ok(_)))));
let res = http_body.as_mut().poll_frame(&mut cx);
assert!(matches!(res, Poll::Ready(None)));
}
#[test]
fn poll_frame_empty_bytes_view_returns_none() {
let mut body = pin!(HttpBodyBuilder::new_fake().empty());
let mut cx = Context::from_waker(Waker::noop());
let result = body.as_mut().poll_frame(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
}
#[test]
fn is_end_stream_bytes_view_non_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("non-empty");
assert!(!body.is_end_stream());
}
#[test]
fn is_end_stream_bytes_view_empty() {
let builder = HttpBodyBuilder::new_fake();
let zero_bytes = BytesView::new();
let body = builder.bytes(zero_bytes);
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_bytes_view_none() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("test");
let mut pinned = pin!(body);
let mut cx = Context::from_waker(Waker::noop());
let _ = pinned.as_mut().poll_frame(&mut cx);
assert!(pinned.is_end_stream());
}
#[test]
fn is_end_stream_empty_body() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_custom_body_empty() {
let builder = HttpBodyBuilder::new_fake();
let custom = http_body_util::Empty::new();
let body = builder.external(custom);
assert!(body.is_end_stream());
}
#[test]
fn is_end_stream_custom_body_with_data() {
let builder = HttpBodyBuilder::new_fake();
let data = Bytes::from_static(b"test data");
let custom = http_body_util::Full::new(data.into());
let body = builder.external(custom);
assert!(!body.is_end_stream());
}
#[test]
fn json_serialization_makes_few_memory_allocations() {
#[derive(Serialize)]
struct LargePayload {
items: Vec<Item>,
}
#[derive(Serialize)]
struct Item {
id: u32,
name: String,
description: String,
value: f64,
}
let payload = LargePayload {
items: (0..300)
.map(|i| Item {
id: i,
name: format!("item-name-{i:04}"),
description: format!("This is a longer description for item number {i:04}"),
value: f64::from(i) * 1.5,
})
.collect(),
};
let expected_size = serde_json::to_vec(&payload).unwrap().len();
assert!(
expected_size > 25_000 && expected_size < 40_000,
"expected ~30 KB JSON, got {expected_size} bytes"
);
let builder = HttpBodyBuilder::with_custom_memory(TransparentMemory::new());
let body = builder.json(&payload).unwrap();
let bytes_view = body.into_bytes_no_buffering().unwrap();
assert_eq!(bytes_view.len(), expected_size);
let block_count = bytes_view.slices().count();
assert!(
block_count <= 5,
"expected at most 5 memory blocks for ~30 KB JSON serialization, got {block_count}"
);
}
#[test]
fn external_body_into_bytes() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"raw bytes");
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes, b"raw bytes");
}
#[test]
fn external_body_empty_into_bytes() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"");
let bytes = block_on(body.into_bytes()).unwrap();
assert!(bytes.is_empty());
}
#[test]
fn external_body_into_json_owned() {
let builder = HttpBodyBuilder::new_fake();
let json_bytes = br#"{"id":42,"name":"alice"}"#;
let body = create_stream_body(&builder, json_bytes);
let model: Model = block_on(body.into_json_owned()).unwrap();
assert_eq!(
model,
Model {
id: 42,
name: "alice".to_string()
}
);
}
#[test]
fn external_body_try_clone_returns_none() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"no clone");
assert!(body.try_clone().is_none());
}
#[test]
fn external_body_into_bytes_view_fails() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"not buffered");
BytesView::try_from(body).unwrap_err();
}
#[test]
fn external_body_into_bytes_no_buffering_returns_none() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"data");
assert!(body.into_bytes_no_buffering().is_none());
}
#[test]
fn external_body_buffered_then_clone() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"clone me");
let buffered = block_on(body.into_buffered()).unwrap();
let cloned = buffered.try_clone().unwrap();
assert_eq!(block_on(buffered.into_text()).unwrap(), "clone me");
assert_eq!(block_on(cloned.into_text()).unwrap(), "clone me");
}
#[test]
fn external_body_with_buffer_limit_exceeded() {
let builder = HttpBodyBuilder::new_fake().with_response_buffer_limit(Some(5));
let body = create_stream_body(&builder, b"this exceeds the limit");
let err = block_on(body.into_buffered()).unwrap_err();
assert!(err.to_string().contains("body size exceeds the limit"));
}
#[test]
fn external_body_with_buffer_limit_ok() {
let builder = HttpBodyBuilder::new_fake().with_response_buffer_limit(Some(1024));
let body = create_stream_body(&builder, b"fits");
let text = block_on(body.into_text()).unwrap();
assert_eq!(text, "fits");
}
#[test]
fn external_body_is_end_stream_with_data() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"data");
assert!(!body.is_end_stream());
}
#[test]
fn external_body_poll_frame_yields_correct_data() {
let builder = HttpBodyBuilder::new_fake();
let mut body = pin!(create_stream_body(&builder, b"exact"));
let mut cx = Context::from_waker(Waker::noop());
if let Poll::Ready(Some(Ok(frame))) = body.as_mut().poll_frame(&mut cx) {
let data = frame.into_data().unwrap();
assert_eq!(data, b"exact");
} else {
panic!("expected a data frame");
}
}
#[test]
fn is_empty_for_empty_body() {
let builder = HttpBodyBuilder::new_fake();
assert!(builder.empty().is_empty());
}
#[test]
fn is_empty_for_text_body() {
let builder = HttpBodyBuilder::new_fake();
assert!(!builder.text("hello").is_empty());
}
#[test]
fn is_empty_for_zero_length_bytes() {
let builder = HttpBodyBuilder::new_fake();
assert!(builder.bytes(BytesView::new()).is_empty());
}
#[test]
fn stream_body_creation() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body_from_chunks(&builder, &[b"hello ", b"world"]);
assert_eq!(body.content_length(), None);
let text = block_on(body.into_text()).unwrap();
assert_eq!(text, "hello world");
}
#[test]
fn stream_body_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"");
let bytes = block_on(body.into_bytes()).unwrap();
assert!(bytes.is_empty());
}
#[test]
fn into_stream_produces_chunks() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("stream test");
let chunks: Vec<_> = block_on(body.into_stream().try_collect()).unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], b"stream test");
}
#[test]
fn has_memory_returns_usable_provider() {
let builder = HttpBodyBuilder::new_fake();
let memory = builder.memory();
let buf = memory.reserve(64);
assert!(buf.capacity() >= 64);
}
}