aioduct 0.1.10

Async-native HTTP client built directly on hyper 1.x — no hyper-util, no legacy
Documentation
use bytes::Bytes;
use http_body_util::BodyExt;

use crate::error::{AioductBody, Error};

/// HTTP request body, either buffered in memory or streaming.
pub enum RequestBody {
    /// Fully buffered body from bytes.
    Buffered(Bytes),
    /// Streaming body from a boxed hyper body.
    Streaming(AioductBody),
}

impl std::fmt::Debug for RequestBody {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RequestBody::Buffered(_) => f.debug_tuple("Buffered").field(&"..").finish(),
            RequestBody::Streaming(_) => f.debug_tuple("Streaming").field(&"..").finish(),
        }
    }
}

impl RequestBody {
    pub(crate) fn into_hyper_body(self) -> AioductBody {
        match self {
            RequestBody::Buffered(b) => http_body_util::Full::new(b)
                .map_err(|never| match never {})
                .boxed_unsync(),
            RequestBody::Streaming(body) => body,
        }
    }

    /// Clone this body if it is buffered. Returns `None` for streaming bodies.
    pub fn try_clone(&self) -> Option<Self> {
        match self {
            RequestBody::Buffered(b) => Some(RequestBody::Buffered(b.clone())),
            RequestBody::Streaming(_) => None,
        }
    }
}

impl From<Bytes> for RequestBody {
    fn from(b: Bytes) -> Self {
        RequestBody::Buffered(b)
    }
}

impl From<Vec<u8>> for RequestBody {
    fn from(v: Vec<u8>) -> Self {
        RequestBody::Buffered(Bytes::from(v))
    }
}

impl From<String> for RequestBody {
    fn from(s: String) -> Self {
        RequestBody::Buffered(Bytes::from(s))
    }
}

impl From<&'static str> for RequestBody {
    fn from(s: &'static str) -> Self {
        RequestBody::Buffered(Bytes::from_static(s.as_bytes()))
    }
}

impl From<&'static [u8]> for RequestBody {
    fn from(s: &'static [u8]) -> Self {
        RequestBody::Buffered(Bytes::from_static(s))
    }
}

impl From<AioductBody> for RequestBody {
    fn from(body: AioductBody) -> Self {
        RequestBody::Streaming(body)
    }
}

/// Async iterator over response body data frames.
pub struct BodyStream {
    body: AioductBody,
    done: bool,
}

impl std::fmt::Debug for BodyStream {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("BodyStream").finish()
    }
}

impl BodyStream {
    pub(crate) fn new(body: AioductBody) -> Self {
        Self { body, done: false }
    }

    /// Returns the next chunk of body data, or `None` when complete.
    pub async fn next(&mut self) -> Option<Result<Bytes, Error>> {
        if self.done {
            return None;
        }

        loop {
            match self.body.frame().await {
                Some(Ok(frame)) => {
                    if let Ok(data) = frame.into_data() {
                        return Some(Ok(data));
                    }
                }
                Some(Err(e)) => {
                    self.done = true;
                    return Some(Err(e));
                }
                None => {
                    self.done = true;
                    return None;
                }
            }
        }
    }
}

#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
    use super::*;

    fn buffered(data: &[u8]) -> RequestBody {
        RequestBody::Buffered(Bytes::from(data.to_vec()))
    }

    fn streaming() -> RequestBody {
        let body: AioductBody = http_body_util::Empty::new()
            .map_err(|never| match never {})
            .boxed_unsync();
        RequestBody::Streaming(body)
    }

    #[test]
    fn try_clone_buffered_returns_some() {
        let body = buffered(b"hello");
        let cloned = body.try_clone();
        assert!(cloned.is_some());
        match cloned.unwrap() {
            RequestBody::Buffered(b) => assert_eq!(&b[..], b"hello"),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn try_clone_streaming_returns_none() {
        let body = streaming();
        assert!(body.try_clone().is_none());
    }

    #[test]
    fn from_bytes() {
        let body: RequestBody = Bytes::from_static(b"data").into();
        match body {
            RequestBody::Buffered(b) => assert_eq!(&b[..], b"data"),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn from_vec() {
        let body: RequestBody = vec![1u8, 2, 3].into();
        match body {
            RequestBody::Buffered(b) => assert_eq!(&b[..], &[1, 2, 3]),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn from_string() {
        let body: RequestBody = String::from("text").into();
        match body {
            RequestBody::Buffered(b) => assert_eq!(&b[..], b"text"),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn from_static_str() {
        let body: RequestBody = "static".into();
        match body {
            RequestBody::Buffered(b) => assert_eq!(&b[..], b"static"),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn from_static_bytes() {
        let body: RequestBody = (b"bytes" as &'static [u8]).into();
        match body {
            RequestBody::Buffered(b) => assert_eq!(&b[..], b"bytes"),
            _ => panic!("expected Buffered"),
        }
    }

    #[test]
    fn from_hyper_body_is_streaming() {
        let hyper_body: AioductBody = http_body_util::Empty::new()
            .map_err(|never| match never {})
            .boxed_unsync();
        let body: RequestBody = hyper_body.into();
        assert!(body.try_clone().is_none());
    }

    #[test]
    fn debug_buffered() {
        let body = buffered(b"data");
        let dbg = format!("{body:?}");
        assert!(dbg.contains("Buffered"));
    }

    #[test]
    fn debug_streaming() {
        let body = streaming();
        let dbg = format!("{body:?}");
        assert!(dbg.contains("Streaming"));
    }

    #[test]
    fn into_hyper_body_buffered() {
        let body = buffered(b"hello");
        let _hyper = body.into_hyper_body();
    }

    #[test]
    fn into_hyper_body_streaming() {
        let body = streaming();
        let _hyper = body.into_hyper_body();
    }

    #[test]
    fn body_stream_debug() {
        let hyper_body: AioductBody = http_body_util::Empty::new()
            .map_err(|never| match never {})
            .boxed_unsync();
        let stream = BodyStream::new(hyper_body);
        let dbg = format!("{stream:?}");
        assert!(dbg.contains("BodyStream"));
    }

    #[tokio::test]
    async fn body_stream_empty_returns_none() {
        let hyper_body: AioductBody = http_body_util::Empty::new()
            .map_err(|never| match never {})
            .boxed_unsync();
        let mut stream = BodyStream::new(hyper_body);
        assert!(stream.next().await.is_none());
    }

    #[tokio::test]
    async fn body_stream_with_data() {
        let hyper_body: AioductBody = http_body_util::Full::new(Bytes::from("hello"))
            .map_err(|never| match never {})
            .boxed_unsync();
        let mut stream = BodyStream::new(hyper_body);
        let chunk = stream.next().await.unwrap().unwrap();
        assert_eq!(&chunk[..], b"hello");
        assert!(stream.next().await.is_none());
    }

    #[tokio::test]
    async fn body_stream_done_stays_none() {
        let hyper_body: AioductBody = http_body_util::Empty::new()
            .map_err(|never| match never {})
            .boxed_unsync();
        let mut stream = BodyStream::new(hyper_body);
        assert!(stream.next().await.is_none());
        assert!(stream.next().await.is_none());
    }
}