viz_core/
body.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures_util::{Stream, TryStream, TryStreamExt};
8use http_body_util::{BodyExt, BodyStream, Full, StreamBody, combinators::UnsyncBoxBody};
9use hyper::body::{Frame, Incoming, SizeHint};
10use sync_wrapper::SyncWrapper;
11
12use crate::{BoxError, Error, HttpBody, Result};
13
14/// A body state.
15#[derive(Clone, Debug, Eq, PartialEq)]
16pub enum BodyState {
17    /// The body is inited.
18    Normal,
19    /// The body is empty.
20    Empty,
21    /// The body has ben used.
22    Used,
23}
24
25/// A body for HTTP [`Request`] and HTTP [`Response`].
26///
27/// [`Request`]: crate::Request
28/// [`Response`]: crate::Response
29#[derive(Debug, Default)]
30pub enum Body<D = Bytes> {
31    /// An empty body.
32    #[default]
33    Empty,
34    /// A body that consists of a single chunk.
35    Full(Full<D>),
36    /// A boxed [`Body`] trait object.
37    Boxed(SyncWrapper<UnsyncBoxBody<D, Error>>),
38    /// An incoming body.
39    Incoming(Incoming),
40}
41
42impl Body {
43    /// Creates an empty body.
44    #[must_use]
45    pub const fn empty() -> Self {
46        Self::Empty
47    }
48
49    /// Wraps a body into box.
50    #[allow(clippy::missing_panics_doc)]
51    pub fn wrap<B>(body: B) -> Self
52    where
53        B: HttpBody + Send + 'static,
54        B::Data: Into<Bytes>,
55        B::Error: Into<BoxError>,
56    {
57        // Copied from Axum, thanks.
58        let mut body = Some(body);
59        <dyn std::any::Any>::downcast_mut::<Option<UnsyncBoxBody<Bytes, Error>>>(&mut body)
60            .and_then(Option::take)
61            .unwrap_or_else(|| {
62                body.unwrap()
63                    .map_frame(|frame| frame.map_data(Into::into))
64                    .map_err(Error::boxed)
65                    .boxed_unsync()
66            })
67            .into()
68    }
69
70    /// A body created from a [`Stream`].
71    pub fn from_stream<S>(stream: S) -> Self
72    where
73        S: TryStream + Send + 'static,
74        S::Ok: Into<Bytes>,
75        S::Error: Into<BoxError>,
76    {
77        StreamBody::new(
78            stream
79                .map_ok(Into::into)
80                .map_ok(Frame::data)
81                .map_err(Error::boxed),
82        )
83        .boxed_unsync()
84        .into()
85    }
86
87    /// A stream created from a [`http_body::Body`].
88    pub fn into_stream(self) -> BodyStream<Self> {
89        BodyStream::new(self)
90    }
91}
92
93impl HttpBody for Body {
94    type Data = Bytes;
95    type Error = Error;
96
97    #[inline]
98    fn poll_frame(
99        self: Pin<&mut Self>,
100        cx: &mut Context<'_>,
101    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
102        match self.get_mut() {
103            Self::Empty => Poll::Ready(None),
104            Self::Full(inner) => Pin::new(inner).poll_frame(cx).map_err(Into::into),
105            Self::Boxed(inner) => Pin::new(inner).get_pin_mut().poll_frame(cx),
106            Self::Incoming(inner) => Pin::new(inner).poll_frame(cx).map_err(Into::into),
107        }
108    }
109
110    #[inline]
111    fn is_end_stream(&self) -> bool {
112        match self {
113            Self::Empty => true,
114            Self::Boxed(_) => false,
115            Self::Full(inner) => inner.is_end_stream(),
116            Self::Incoming(inner) => inner.is_end_stream(),
117        }
118    }
119
120    #[inline]
121    fn size_hint(&self) -> SizeHint {
122        match self {
123            Self::Empty => SizeHint::with_exact(0),
124            Self::Full(inner) => inner.size_hint(),
125            Self::Boxed(_) => SizeHint::default(),
126            Self::Incoming(inner) => inner.size_hint(),
127        }
128    }
129}
130
131impl Stream for Body {
132    type Item = Result<Bytes, std::io::Error>;
133
134    #[inline]
135    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        match match self.get_mut() {
137            Self::Empty => return Poll::Ready(None),
138            Self::Full(inner) => Pin::new(inner)
139                .poll_frame(cx)
140                .map_err(std::io::Error::other)?,
141            Self::Boxed(inner) => Pin::new(inner)
142                .get_pin_mut()
143                .poll_frame(cx)
144                .map_err(std::io::Error::other)?,
145            Self::Incoming(inner) => Pin::new(inner)
146                .poll_frame(cx)
147                .map_err(std::io::Error::other)?,
148        } {
149            Poll::Pending => Poll::Pending,
150            Poll::Ready(None) => Poll::Ready(None),
151            Poll::Ready(Some(frame)) => Poll::Ready(frame.into_data().map(Ok).ok()),
152        }
153    }
154
155    #[inline]
156    fn size_hint(&self) -> (usize, Option<usize>) {
157        let sh = match self {
158            Self::Empty => return (0, Some(0)),
159            Self::Full(inner) => inner.size_hint(),
160            Self::Boxed(_) => return (0, None),
161            Self::Incoming(inner) => inner.size_hint(),
162        };
163        (
164            usize::try_from(sh.lower()).unwrap_or(usize::MAX),
165            sh.upper().map(|v| usize::try_from(v).unwrap_or(usize::MAX)),
166        )
167    }
168}
169
170impl From<()> for Body {
171    fn from((): ()) -> Self {
172        Self::Empty
173    }
174}
175
176impl<D> From<Full<D>> for Body<D> {
177    fn from(value: Full<D>) -> Self {
178        Self::Full(value)
179    }
180}
181
182impl<D> From<UnsyncBoxBody<D, Error>> for Body<D> {
183    fn from(value: UnsyncBoxBody<D, Error>) -> Self {
184        Self::Boxed(SyncWrapper::new(value))
185    }
186}