use bytesbuf::mem::{GlobalPool, HasMemory, Memory, MemoryShared, OpaqueMemory};
use bytesbuf::{BytesBuf, BytesView};
use futures::{Stream, TryStreamExt};
use http_body::{Body, Frame};
use http_body_util::BodyExt;
use thread_aware::{ThreadAware, Unaware};
use tick::Clock;
use super::HttpBody;
use super::options::HttpBodyOptions;
use super::timeout_body::TimeoutBody;
#[cfg(any(feature = "json", test))]
use crate::json::JsonError;
use crate::{HttpError, Result};
#[derive(Debug, Clone, ThreadAware)]
pub struct HttpBodyBuilder {
memory: MemoryWrapper,
clock: Clock,
pub(super) options: HttpBodyOptions,
}
impl HttpBodyBuilder {
#[cfg(any(feature = "test-util", test))]
#[must_use]
pub fn new_fake() -> Self {
Self::new(GlobalPool::new(), &Clock::new_frozen())
}
#[must_use]
pub fn new(memory: GlobalPool, clock: &Clock) -> Self {
Self {
memory: MemoryWrapper::Global(memory),
clock: clock.clone(),
options: HttpBodyOptions::default(),
}
}
#[must_use]
pub fn with_custom_memory(memory: impl MemoryShared, clock: &Clock) -> Self {
Self {
memory: MemoryWrapper::Opaque(Unaware(OpaqueMemory::new(memory))),
clock: clock.clone(),
options: HttpBodyOptions::default(),
}
}
#[must_use]
pub fn with_options(mut self, options: HttpBodyOptions) -> Self {
self.options = options;
self
}
pub fn body<B>(&self, body: B, options: &HttpBodyOptions) -> HttpBody
where
B: Body<Data = BytesView, Error: Into<HttpError>> + Send + 'static,
{
let merged = options.merge(&self.options);
let body = body.map_err(Into::into);
match merged.timeout {
Some(timeout) => HttpBody::from_streaming(Box::pin(TimeoutBody::new(body, timeout, &self.clock)), merged),
None => HttpBody::from_streaming(Box::pin(body), merged),
}
}
pub fn stream<S>(&self, stream: S, options: &HttpBodyOptions) -> HttpBody
where
S: Stream<Item = Result<BytesView>> + Send + 'static,
{
use http_body_util::StreamBody;
let framed = stream.map_ok(Frame::data);
self.body(StreamBody::new(framed), options)
}
pub fn text(&self, str: impl AsRef<str>) -> HttpBody {
self.slice(str.as_ref().as_bytes())
}
pub fn slice(&self, data: impl AsRef<[u8]>) -> HttpBody {
let mut builder = self.reserve(data.as_ref().len());
builder.put_slice(data.as_ref());
self.bytes(builder.consume_all())
}
#[expect(clippy::unused_self, reason = "body might receive more data from builder later")]
pub fn bytes(&self, b: impl Into<BytesView>) -> HttpBody {
HttpBody::from_bytes(b.into())
}
#[expect(clippy::unused_self, reason = "body might receive more data from builder later")]
pub fn empty(&self) -> HttpBody {
HttpBody::empty()
}
#[cfg(any(feature = "json", test))]
pub fn json<T: serde_core::ser::Serialize>(&self, data: &T) -> std::result::Result<HttpBody, JsonError> {
let builder = BytesBuf::new();
let mut writer = builder.into_writer(&self);
serde_json::to_writer(&mut writer, data).map_err(JsonError::serialization)?;
Ok(self.bytes(writer.into_inner().consume_all()))
}
}
impl Memory for HttpBodyBuilder {
fn reserve(&self, min_bytes: usize) -> BytesBuf {
self.memory.reserve(min_bytes)
}
}
impl HasMemory for HttpBodyBuilder {
fn memory(&self) -> impl MemoryShared {
self.memory.clone()
}
}
#[derive(Debug, Clone, ThreadAware)]
enum MemoryWrapper {
Global(GlobalPool),
Opaque(Unaware<OpaqueMemory>),
}
impl Memory for MemoryWrapper {
fn reserve(&self, min_bytes: usize) -> BytesBuf {
match self {
Self::Global(pool) => pool.reserve(min_bytes),
Self::Opaque(memory) => memory.reserve(min_bytes),
}
}
}
impl AsRef<Clock> for HttpBodyBuilder {
fn as_ref(&self) -> &Clock {
&self.clock
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::time::Duration;
use bytes::Bytes;
use bytesbuf::mem::testing::TransparentMemory;
use futures::executor::block_on;
use futures::stream;
use serde::Serialize;
use static_assertions::assert_impl_all;
use tick::ClockControl;
use super::*;
use crate::testing::{create_stream_body, create_stream_body_from_chunks};
#[test]
fn assert_send_and_sync() {
assert_impl_all!(HttpBodyBuilder: Send, Sync, AsRef<Clock>, std::fmt::Debug);
}
#[test]
fn new_with_global_memory() {
let clock = Clock::new_frozen();
let memory = GlobalPool::new();
let builder = HttpBodyBuilder::new(memory, &clock);
let body = builder.text("test");
assert_eq!(body.content_length(), Some(4));
let _clock: &Clock = builder.as_ref();
}
#[test]
fn with_custom_memory() {
let clock = Clock::new_frozen();
let builder = HttpBodyBuilder::with_custom_memory(TransparentMemory::new(), &clock);
let body = builder.text("hello");
let data = BytesView::try_from(body).unwrap();
assert_eq!(data.len(), 5);
}
#[test]
fn with_options_sets_buffer_limit() {
let options = HttpBodyOptions::default().buffer_limit(1024);
let builder = HttpBodyBuilder::new_fake().with_options(options);
assert_eq!(builder.options, options);
}
#[test]
fn with_options_defaults() {
let builder = HttpBodyBuilder::new_fake();
assert_eq!(builder.options, HttpBodyOptions::default());
}
#[test]
fn has_memory_returns_usable_provider() {
let builder = HttpBodyBuilder::new_fake();
let memory = builder.memory();
let buf = memory.reserve(64);
assert!(buf.capacity() >= 64);
}
#[test]
fn text_from_str() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.text("hello world");
assert_eq!(body.content_length(), Some(11));
let result = block_on(body.into_text()).unwrap();
assert_eq!(result, "hello world");
}
#[test]
fn text_from_string() {
let builder = HttpBodyBuilder::new_fake();
let text = String::from("hello world");
let body = builder.text(text);
assert_eq!(body.content_length(), Some(11));
let result = block_on(body.into_text()).unwrap();
assert_eq!(result, "hello world");
}
#[test]
fn slice_creation() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.slice([1, 2, 3, 4]);
assert_eq!(body.content_length(), Some(4));
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes, &[1, 2, 3, 4]);
}
#[test]
fn bytes_from_bytes_view() {
let memory = GlobalPool::new();
let builder = HttpBodyBuilder::new_fake();
let bytes = BytesView::copied_from_slice(b"test", &memory);
let body = builder.bytes(bytes);
assert_eq!(body.content_length(), Some(4));
}
#[test]
fn bytes_from_bytes_crate() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.bytes(Bytes::from_static(b"test"));
assert_eq!(body.content_length(), Some(4));
}
#[test]
fn empty_body_creation() {
let builder = HttpBodyBuilder::new_fake();
let body = builder.empty();
assert_eq!(body.content_length(), Some(0));
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes.len(), 0);
}
#[test]
fn content_length_empty_body() {
let builder = HttpBodyBuilder::new_fake();
assert_eq!(builder.empty().content_length(), Some(0));
}
#[test]
fn content_length_text_body() {
let builder = HttpBodyBuilder::new_fake();
assert_eq!(builder.text("hello").content_length(), Some(5));
}
#[test]
fn content_length_slice_body() {
let builder = HttpBodyBuilder::new_fake();
assert_eq!(builder.slice([1, 2, 3]).content_length(), Some(3));
}
#[test]
fn content_length_bytes_body() {
let builder = HttpBodyBuilder::new_fake();
assert_eq!(builder.bytes(BytesView::new()).content_length(), Some(0));
}
#[test]
fn stream_body_creation() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body_from_chunks(&builder, &[b"hello ", b"world"], &HttpBodyOptions::default());
assert_eq!(body.content_length(), None);
let text = block_on(body.into_text()).unwrap();
assert_eq!(text, "hello world");
}
#[test]
fn stream_body_empty() {
let builder = HttpBodyBuilder::new_fake();
let body = create_stream_body(&builder, b"", &HttpBodyOptions::default());
let bytes = block_on(body.into_bytes()).unwrap();
assert!(bytes.is_empty());
}
#[test]
fn stream_with_timeout_returns_data_before_timeout() {
let clock = ClockControl::new().to_clock();
let builder = HttpBodyBuilder::new(GlobalPool::new(), &clock);
let chunks: Vec<Result<BytesView>> = [b"hello " as &[u8], b"world"]
.iter()
.map(|c| Ok(BytesView::copied_from_slice(c, &builder)))
.collect();
let options = HttpBodyOptions::default().timeout(Duration::from_secs(30));
let body = builder.stream(stream::iter(chunks), &options);
assert_eq!(body.content_length(), None);
let text = block_on(body.into_text()).unwrap();
assert_eq!(text, "hello world");
}
#[test]
fn body_with_max_duration_timeout_still_returns_data() {
let builder = HttpBodyBuilder::new_fake();
let options = HttpBodyOptions::default().timeout(Duration::MAX);
let body = builder.body(
http_body_util::Full::new(BytesView::copied_from_slice(b"hello", &builder)),
&options,
);
let bytes = block_on(body.into_bytes()).unwrap();
assert_eq!(bytes, b"hello");
}
#[test]
fn json_serialization_makes_few_memory_allocations() {
#[derive(Serialize)]
struct LargePayload {
items: Vec<Item>,
}
#[derive(Serialize)]
struct Item {
id: u32,
name: String,
description: String,
value: f64,
}
let payload = LargePayload {
items: (0..300)
.map(|i| Item {
id: i,
name: format!("item-name-{i:04}"),
description: format!("This is a longer description for item number {i:04}"),
value: f64::from(i) * 1.5,
})
.collect(),
};
let expected_size = serde_json::to_vec(&payload).unwrap().len();
assert!(
expected_size > 25_000 && expected_size < 40_000,
"expected ~30 KB JSON, got {expected_size} bytes"
);
let clock = Clock::new_frozen();
let builder = HttpBodyBuilder::with_custom_memory(TransparentMemory::new(), &clock);
let body = builder.json(&payload).unwrap();
let bytes_view = body.into_bytes_no_buffering().unwrap();
assert_eq!(bytes_view.len(), expected_size);
let block_count = bytes_view.slices().count();
assert!(
block_count <= 5,
"expected at most 5 memory blocks for ~30 KB JSON serialization, got {block_count}"
);
}
#[test]
fn builder_merges_per_call_options_with_defaults() {
let clock = Clock::new_frozen();
let builder_options = HttpBodyOptions::default().timeout(Duration::from_secs(30));
let builder = HttpBodyBuilder::new(GlobalPool::new(), &clock).with_options(builder_options);
let per_call = HttpBodyOptions::default().timeout(Duration::from_secs(5));
let body = builder.stream(stream::iter(Vec::<Result<BytesView>>::new()), &per_call);
assert_eq!(body.content_length(), None);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn fake_builder_works_with_timeouts() {
let options = HttpBodyOptions::default().timeout(Duration::from_secs(1));
let builder = HttpBodyBuilder::new_fake();
let result = create_stream_body(&builder, b"Hello World", &options).into_text().await.unwrap();
assert_eq!(result, "Hello World");
}
}