use std::convert::Infallible;
use std::error::Error as StdError;
use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Bytes, Frame};
pub type BoxError = Box<dyn StdError + Send + Sync>;
pub type ProxyBody = BoxBody<Bytes, BoxError>;
pub fn full_body(bytes: Bytes) -> ProxyBody {
Full::new(bytes)
.map_err(|never: Infallible| match never {})
.boxed()
}
pub fn empty_body() -> ProxyBody {
full_body(Bytes::new())
}
pub fn stream_body<S, E>(stream: S) -> ProxyBody
where
S: futures_util::Stream<Item = Result<Bytes, E>> + Send + Sync + 'static,
E: StdError + Send + Sync + 'static,
{
let frames = stream
.map_ok(Frame::data)
.map_err(|e| Box::new(e) as BoxError);
StreamBody::new(frames).boxed()
}
#[cfg(test)]
mod tests {
use super::*;
use http_body_util::BodyExt;
#[tokio::test]
async fn full_body_collects_back_to_bytes() {
let body = full_body(Bytes::from_static(b"hello world"));
let collected = body.collect().await.unwrap().to_bytes();
assert_eq!(&collected[..], b"hello world");
}
#[tokio::test]
async fn empty_body_yields_zero_bytes() {
let body = empty_body();
let collected = body.collect().await.unwrap().to_bytes();
assert!(collected.is_empty());
}
#[tokio::test]
async fn stream_body_preserves_chunk_order() {
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from_static(b"first ")),
Ok(Bytes::from_static(b"second ")),
Ok(Bytes::from_static(b"third")),
];
let stream = futures_util::stream::iter(chunks);
let body = stream_body(stream);
let collected = body.collect().await.unwrap().to_bytes();
assert_eq!(&collected[..], b"first second third");
}
#[tokio::test]
async fn stream_body_propagates_errors() {
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from_static(b"ok ")),
Err(std::io::Error::other("boom")),
];
let stream = futures_util::stream::iter(chunks);
let body = stream_body(stream);
let result = body.collect().await;
assert!(result.is_err(), "stream error must propagate, not swallow");
}
#[tokio::test]
async fn stream_body_large_payload_does_not_buffer_eagerly() {
let chunk = Bytes::from(vec![0xAB; 1024 * 1024]);
let chunks: Vec<Result<Bytes, std::io::Error>> =
(0..16).map(|_| Ok(chunk.clone())).collect();
let stream = futures_util::stream::iter(chunks);
let body = stream_body(stream);
let collected = body.collect().await.unwrap().to_bytes();
assert_eq!(collected.len(), 16 * 1024 * 1024);
}
}