use crate::error::{BoxError, Error};
use bytes::Bytes;
#[doc(hidden)]
pub use http_body::Body as HttpBody;
pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
pub type BoxBodySync = http_body_util::combinators::BoxBody<Bytes, Error>;
pub fn boxed<B>(body: B) -> BoxBody
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
use http_body_util::BodyExt;
try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
}
pub fn boxed_sync<B>(body: B) -> BoxBodySync
where
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
use http_body_util::BodyExt;
body.map_err(Error::new).boxed()
}
#[doc(hidden)]
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}
pub fn empty() -> BoxBody {
boxed(http_body_util::Empty::<Bytes>::new())
}
pub fn empty_sync() -> BoxBodySync {
boxed_sync(http_body_util::Empty::<Bytes>::new())
}
#[doc(hidden)]
pub fn to_boxed<B>(body: B) -> BoxBody
where
B: Into<Bytes>,
{
boxed(http_body_util::Full::new(body.into()))
}
#[doc(hidden)]
pub fn to_boxed_sync<B>(body: B) -> BoxBodySync
where
B: Into<Bytes>,
{
boxed_sync(http_body_util::Full::new(body.into()))
}
pub fn from_bytes(bytes: Bytes) -> BoxBody {
boxed(http_body_util::Full::new(bytes))
}
pub fn wrap_stream<S, O, E>(stream: S) -> BoxBody
where
S: futures_util::Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<BoxError> + 'static,
{
use futures_util::TryStreamExt;
use http_body_util::StreamBody;
let frame_stream = stream
.map_ok(|chunk| http_body::Frame::data(chunk.into()))
.map_err(|e| Error::new(e.into()));
boxed(StreamBody::new(frame_stream))
}
pub fn wrap_stream_sync<S, O, E>(stream: S) -> BoxBodySync
where
S: futures_util::Stream<Item = Result<O, E>> + Send + Sync + 'static,
O: Into<Bytes> + 'static,
E: Into<BoxError> + 'static,
{
use futures_util::TryStreamExt;
use http_body_util::StreamBody;
let frame_stream = stream
.map_ok(|chunk| http_body::Frame::data(chunk.into()))
.map_err(|e| Error::new(e.into()));
boxed_sync(StreamBody::new(frame_stream))
}
#[cfg(test)]
mod tests {
use super::*;
async fn collect_bytes<B>(body: B) -> Result<Bytes, Error>
where
B: HttpBody,
B::Error: Into<BoxError>,
{
use http_body_util::BodyExt;
let collected = body.collect().await.map_err(Error::new)?;
Ok(collected.to_bytes())
}
#[tokio::test]
async fn test_empty_body() {
let body = empty();
let bytes = collect_bytes(body).await.unwrap();
assert_eq!(bytes.len(), 0);
}
#[tokio::test]
async fn test_from_bytes() {
let data = Bytes::from("hello world");
let body = from_bytes(data.clone());
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, data);
}
#[tokio::test]
async fn test_to_boxed_string() {
let s = "hello world";
let body = to_boxed(s);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from(s));
}
#[tokio::test]
async fn test_to_boxed_vec() {
let vec = vec![1u8, 2, 3, 4, 5];
let body = to_boxed(vec.clone());
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected.as_ref(), vec.as_slice());
}
#[tokio::test]
async fn test_boxed() {
use http_body_util::Full;
let full_body = Full::new(Bytes::from("test data"));
let boxed_body: BoxBody = boxed(full_body);
let collected = collect_bytes(boxed_body).await.unwrap();
assert_eq!(collected, Bytes::from("test data"));
}
#[tokio::test]
async fn test_boxed_sync() {
use http_body_util::Full;
let full_body = Full::new(Bytes::from("sync test"));
let boxed_body: BoxBodySync = boxed_sync(full_body);
let collected = collect_bytes(boxed_body).await.unwrap();
assert_eq!(collected, Bytes::from("sync test"));
}
#[tokio::test]
async fn test_wrap_stream_single_chunk() {
use futures_util::stream;
let data = Bytes::from("single chunk");
let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, data);
}
#[tokio::test]
async fn test_wrap_stream_multiple_chunks() {
use futures_util::stream;
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("chunk1")),
Ok(Bytes::from("chunk2")),
Ok(Bytes::from("chunk3")),
];
let expected = Bytes::from("chunk1chunk2chunk3");
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, expected);
}
#[tokio::test]
async fn test_wrap_stream_empty() {
use futures_util::stream;
let stream = stream::iter(vec![Ok::<_, std::io::Error>(Bytes::new())]);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected.len(), 0);
}
#[tokio::test]
async fn test_wrap_stream_error() {
use futures_util::stream;
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("chunk1")),
Err(std::io::Error::other("test error")),
];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let result = collect_bytes(body).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_wrap_stream_various_types() {
use futures_util::stream;
let chunks = vec![Ok::<_, std::io::Error>("string slice"), Ok("another string")];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("string sliceanother string"));
let chunks = vec![
Ok::<_, std::io::Error>(String::from("owned ")),
Ok(String::from("strings")),
];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("owned strings"));
let chunks = vec![
Ok::<_, std::io::Error>(vec![72u8, 101, 108, 108, 111]), Ok(vec![32u8, 87, 111, 114, 108, 100]), ];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("Hello World"));
let chunks = vec![
Ok::<_, std::io::Error>(&[98u8, 121, 116, 101] as &[u8]), Ok(&[115u8, 33] as &[u8]), ];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("bytes!"));
struct CustomChunk {
data: String,
}
impl From<CustomChunk> for Bytes {
fn from(chunk: CustomChunk) -> Bytes {
Bytes::from(chunk.data)
}
}
let chunks = vec![
Ok::<_, std::io::Error>(CustomChunk { data: "custom ".into() }),
Ok(CustomChunk { data: "struct".into() }),
];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("custom struct"));
}
#[tokio::test]
async fn test_wrap_stream_custom_stream_type() {
use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CustomStream {
chunks: Vec<Result<Bytes, std::io::Error>>,
}
impl CustomStream {
fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
Self { chunks }
}
}
impl futures_util::Stream for CustomStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.chunks.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(self.chunks.remove(0)))
}
}
}
let stream = CustomStream::new(vec![Ok(Bytes::from("custom ")), Ok(Bytes::from("stream"))]);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("custom stream"));
}
#[tokio::test]
async fn test_wrap_stream_custom_error_type() {
use bytes::Bytes;
use futures_util::stream;
#[derive(Debug, Clone)]
struct CustomError {
message: String,
}
impl std::fmt::Display for CustomError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CustomError: {}", self.message)
}
}
impl std::error::Error for CustomError {}
let chunks = vec![
Ok::<_, CustomError>(Bytes::from("custom ")),
Ok(Bytes::from("error type")),
];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, Bytes::from("custom error type"));
let chunks = vec![
Ok::<_, CustomError>(Bytes::from("data")),
Err(CustomError {
message: "custom error".into(),
}),
];
let stream = stream::iter(chunks);
let body = wrap_stream(stream);
let result = collect_bytes(body).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_wrap_stream_incremental_consumption() {
use bytes::Bytes;
use http_body_util::BodyExt;
use std::pin::Pin;
use std::task::{Context, Poll};
struct IncrementalStream {
chunks: Vec<Result<Bytes, std::io::Error>>,
}
impl IncrementalStream {
fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
Self { chunks }
}
}
impl futures_util::Stream for IncrementalStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.chunks.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(self.chunks.remove(0)))
}
}
}
let stream = IncrementalStream::new(vec![
Ok(Bytes::from("chunk1")),
Ok(Bytes::from("chunk2")),
Ok(Bytes::from("chunk3")),
]);
let mut body = wrap_stream(stream);
let frame1 = body.frame().await.unwrap().unwrap();
assert!(frame1.is_data());
assert_eq!(frame1.into_data().unwrap(), Bytes::from("chunk1"));
let frame2 = body.frame().await.unwrap().unwrap();
assert!(frame2.is_data());
assert_eq!(frame2.into_data().unwrap(), Bytes::from("chunk2"));
let frame3 = body.frame().await.unwrap().unwrap();
assert!(frame3.is_data());
assert_eq!(frame3.into_data().unwrap(), Bytes::from("chunk3"));
let frame4 = body.frame().await;
assert!(frame4.is_none());
}
#[tokio::test]
async fn test_wrap_stream_sync_single_chunk() {
use futures_util::stream;
let data = Bytes::from("sync single chunk");
let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
let body = wrap_stream_sync(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, data);
}
#[tokio::test]
async fn test_wrap_stream_sync_multiple_chunks() {
use futures_util::stream;
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("sync1")),
Ok(Bytes::from("sync2")),
Ok(Bytes::from("sync3")),
];
let expected = Bytes::from("sync1sync2sync3");
let stream = stream::iter(chunks);
let body = wrap_stream_sync(stream);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, expected);
}
#[tokio::test]
async fn test_empty_sync_body() {
let body = empty_sync();
let bytes = collect_bytes(body).await.unwrap();
assert_eq!(bytes.len(), 0);
}
#[tokio::test]
async fn test_to_boxed_sync() {
let data = Bytes::from("sync boxed data");
let body = to_boxed_sync(data.clone());
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, data);
}
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
fn _assert_send_sync_bounds() {
_assert_send::<BoxBodySync>();
_assert_sync::<BoxBodySync>();
_assert_send::<BoxBody>();
}
#[tokio::test]
async fn test_wrap_stream_sync_produces_sync_body() {
use futures_util::stream;
let data = Bytes::from("test sync");
let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
let body = wrap_stream_sync(stream);
fn check_sync<T: Sync>(_: &T) {}
check_sync(&body);
let collected = collect_bytes(body).await.unwrap();
assert_eq!(collected, data);
}
#[test]
fn test_empty_sync_is_sync() {
let body = empty_sync();
fn check_sync<T: Sync>(_: &T) {}
check_sync(&body);
}
#[test]
fn test_boxed_sync_is_sync() {
use http_body_util::Full;
let body = boxed_sync(Full::new(Bytes::from("test")));
fn check_sync<T: Sync>(_: &T) {}
check_sync(&body);
}
}