use crate::Response;
use bytes::Bytes;
use std::pin::Pin;
pub(crate) type BoxStream = Pin<
Box<
dyn futures_core::Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>>
+ Send,
>,
>;
pub struct Body {
inner: BodyInner,
}
pub(crate) enum BodyInner {
Bytes(Bytes),
Stream(BoxStream),
}
impl std::fmt::Debug for Body {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
BodyInner::Bytes(v) => f
.debug_struct("Body")
.field("kind", &"bytes")
.field("length", &v.len())
.finish(),
BodyInner::Stream(_) => f.debug_struct("Body").field("kind", &"stream").finish(),
}
}
}
impl Body {
pub fn as_bytes(&self) -> Option<&[u8]> {
match &self.inner {
BodyInner::Bytes(v) => Some(v),
BodyInner::Stream(_) => None,
}
}
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: futures_core::Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
use futures_util::StreamExt;
let mapped = stream.map(|result| result.map(|o| o.into()).map_err(|e| e.into()));
Body {
inner: BodyInner::Stream(Box::pin(mapped)),
}
}
pub fn try_clone(&self) -> Option<Body> {
match &self.inner {
BodyInner::Bytes(v) => Some(Body {
inner: BodyInner::Bytes(v.clone()),
}),
BodyInner::Stream(_) => None,
}
}
pub(crate) fn into_inner(self) -> BodyInner {
self.inner
}
#[cfg(test)]
pub(crate) async fn into_bytes(self) -> Result<Vec<u8>, crate::Error> {
match self.inner {
BodyInner::Bytes(v) => Ok(v.to_vec()),
BodyInner::Stream(mut stream) => {
use futures_util::StreamExt;
let mut buf = Vec::new();
while let Some(chunk) = stream.next().await {
let bytes = chunk.map_err(|e| {
crate::Error::body(crate::error::ContextError::new("stream body error", e))
})?;
buf.extend_from_slice(&bytes);
}
Ok(buf)
}
}
}
}
impl From<Vec<u8>> for Body {
fn from(v: Vec<u8>) -> Self {
Self {
inner: BodyInner::Bytes(Bytes::from(v)),
}
}
}
impl From<&'static [u8]> for Body {
fn from(s: &'static [u8]) -> Self {
Self {
inner: BodyInner::Bytes(Bytes::from_static(s)),
}
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
Self {
inner: BodyInner::Bytes(Bytes::from(s)),
}
}
}
impl From<&'static str> for Body {
fn from(s: &'static str) -> Self {
Self {
inner: BodyInner::Bytes(Bytes::from_static(s.as_bytes())),
}
}
}
impl From<Bytes> for Body {
fn from(b: Bytes) -> Self {
Self {
inner: BodyInner::Bytes(b),
}
}
}
impl Default for Body {
fn default() -> Self {
Self {
inner: BodyInner::Bytes(Bytes::new()),
}
}
}
impl From<Response> for Body {
fn from(resp: Response) -> Body {
Body::wrap_stream(resp.bytes_stream())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn body_from_conversions() {
let cases: Vec<(&str, Body, &[u8])> = vec![
("Vec<u8>", Body::from(vec![1, 2, 3]), &[1, 2, 3]),
("&[u8]", Body::from(&b"hello"[..]), b"hello"),
("String", Body::from("hello".to_owned()), b"hello"),
("&str", Body::from("hello"), b"hello"),
("Bytes", Body::from(Bytes::from_static(b"hello")), b"hello"),
("default", Body::default(), b""),
];
for (label, body, expected) in &cases {
assert_eq!(body.as_bytes().unwrap(), *expected, "Body::from({label})");
}
}
#[test]
fn try_clone_variants() {
let body = Body::from("test");
let clone = body.try_clone().unwrap();
assert_eq!(clone.as_bytes().unwrap(), b"test");
let stream =
futures_util::stream::iter(vec![Ok::<_, std::io::Error>(Bytes::from("chunk"))]);
assert!(Body::wrap_stream(stream).try_clone().is_none());
}
#[test]
fn body_stream_as_bytes_returns_none() {
let stream =
futures_util::stream::iter(vec![Ok::<_, std::io::Error>(Bytes::from("chunk"))]);
let body = Body::wrap_stream(stream);
assert!(body.as_bytes().is_none());
}
#[test]
fn body_debug_bytes() {
let body = Body::from("hi");
let s = format!("{body:?}");
assert!(s.contains("bytes"), "should mention bytes: {s}");
assert!(s.contains("2"), "should show length: {s}");
}
#[test]
fn body_debug_stream() {
let stream =
futures_util::stream::iter(vec![Ok::<_, std::io::Error>(Bytes::from("chunk"))]);
let body = Body::wrap_stream(stream);
let s = format!("{body:?}");
assert!(s.contains("stream"), "should mention stream: {s}");
}
#[test]
fn into_bytes_variants() {
let body = Body::from(vec![1, 2, 3]);
assert_eq!(futures_executor::block_on(body.into_bytes()).unwrap(), vec![1, 2, 3]);
let stream = futures_util::stream::iter(vec![
Ok::<_, std::io::Error>(Bytes::from("hello ")),
Ok(Bytes::from("world")),
]);
let body = Body::wrap_stream(stream);
assert_eq!(futures_executor::block_on(body.into_bytes()).unwrap(), b"hello world");
let stream = futures_util::stream::iter(vec![
Ok::<Bytes, std::io::Error>(Bytes::from("ok")),
Err(std::io::Error::other("fail")),
]);
let body = Body::wrap_stream(stream);
assert!(futures_executor::block_on(body.into_bytes()).is_err());
}
}