axum 0.3.1

Web framework that focuses on ergonomics and modularity
Documentation
use crate::{response::IntoResponse, BoxError, Error};
use bytes::Bytes;
use futures_util::{
    ready,
    stream::{self, TryStream},
};
use http::{HeaderMap, Response};
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
    fmt,
    pin::Pin,
    task::{Context, Poll},
};
use sync_wrapper::SyncWrapper;

pin_project! {
    /// An [`http_body::Body`] created from a [`Stream`].
    ///
    /// The purpose of this type is to be used in responses. If you want to
    /// extract the request body as a stream consider using
    /// [`BodyStream`](crate::extract::BodyStream).
    ///
    /// # Example
    ///
    /// ```
    /// use axum::{
    ///     Router,
    ///     routing::get,
    ///     body::StreamBody,
    ///     response::IntoResponse,
    /// };
    /// use futures::stream;
    ///
    /// async fn handler() -> impl IntoResponse {
    ///     let chunks: Vec<Result<_, std::io::Error>> = vec![
    ///         Ok("Hello,"),
    ///         Ok(" "),
    ///         Ok("world!"),
    ///     ];
    ///     let stream = stream::iter(chunks);
    ///     StreamBody::new(stream)
    /// }
    ///
    /// let app = Router::new().route("/", get(handler));
    /// # async {
    /// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
    /// # };
    /// ```
    ///
    /// [`Stream`]: futures_util::stream::Stream
    pub struct StreamBody<S> {
        #[pin]
        stream: SyncWrapper<S>,
    }
}

impl<S> StreamBody<S> {
    /// Create a new `StreamBody` from a [`Stream`].
    ///
    /// [`Stream`]: futures_util::stream::Stream
    pub fn new(stream: S) -> Self
    where
        S: TryStream + Send + 'static,
        S::Ok: Into<Bytes>,
        S::Error: Into<BoxError>,
    {
        Self {
            stream: SyncWrapper::new(stream),
        }
    }
}

impl<S> IntoResponse for StreamBody<S>
where
    S: TryStream + Send + 'static,
    S::Ok: Into<Bytes>,
    S::Error: Into<BoxError>,
{
    type Body = Self;
    type BodyError = Error;

    fn into_response(self) -> Response<Self> {
        Response::new(self)
    }
}

impl Default for StreamBody<futures_util::stream::Empty<Result<Bytes, Error>>> {
    fn default() -> Self {
        Self::new(stream::empty())
    }
}

impl<S> fmt::Debug for StreamBody<S> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_tuple("StreamBody").finish()
    }
}

impl<S> Body for StreamBody<S>
where
    S: TryStream,
    S::Ok: Into<Bytes>,
    S::Error: Into<BoxError>,
{
    type Data = Bytes;
    type Error = Error;

    fn poll_data(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
        let stream = self.project().stream.get_pin_mut();
        match ready!(stream.try_poll_next(cx)) {
            Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
            Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
            None => Poll::Ready(None),
        }
    }

    fn poll_trailers(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
        Poll::Ready(Ok(None))
    }
}

#[test]
fn stream_body_traits() {
    use futures_util::stream::Empty;

    type EmptyStream = StreamBody<Empty<Result<Bytes, BoxError>>>;

    crate::test_helpers::assert_send::<EmptyStream>();
    crate::test_helpers::assert_sync::<EmptyStream>();
    crate::test_helpers::assert_unpin::<EmptyStream>();
}