roa_core/
body.rs

1use std::mem;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::{Bytes, BytesMut};
6use futures::future::ok;
7use futures::stream::{once, Stream, StreamExt};
8use tokio::io::{self, AsyncRead, ReadBuf};
9
10const DEFAULT_CHUNK_SIZE: usize = 4096;
11
12/// The body of response.
13///
14/// ### Example
15///
16/// ```rust
17/// use roa_core::Body;
18/// use futures::StreamExt;
19/// use std::io;
20/// use bytes::Bytes;
21///
22/// async fn read_body(body: Body) -> io::Result<Bytes> {
23///     Ok(match body {
24///         Body::Empty => Bytes::new(),
25///         Body::Once(bytes) => bytes,
26///         Body::Stream(mut stream) => {
27///             let mut bytes = Vec::new();
28///             while let Some(item) = stream.next().await {
29///                 bytes.extend_from_slice(&*item?);
30///             }
31///             bytes.into()
32///         }
33///     })
34/// }
35/// ```
36pub enum Body {
37    /// Empty kind
38    Empty,
39
40    /// Bytes kind.
41    Once(Bytes),
42
43    /// Stream kind.
44    Stream(Segment),
45}
46
47/// A boxed stream.
48#[derive(Default)]
49pub struct Segment(Option<Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Sync + Send + 'static>>>);
50
51impl Body {
52    /// Construct an empty body.
53    #[inline]
54    pub fn empty() -> Self {
55        Body::Empty
56    }
57
58    /// Construct a once body.
59    #[inline]
60    pub fn once(bytes: impl Into<Bytes>) -> Self {
61        Body::Once(bytes.into())
62    }
63
64    /// Construct an empty body of stream kind.
65    #[inline]
66    pub fn stream<S>(stream: S) -> Self
67    where
68        S: Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
69    {
70        Body::Stream(Segment::new(stream))
71    }
72
73    /// Write stream.
74    #[inline]
75    pub fn write_stream(
76        &mut self,
77        stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
78    ) -> &mut Self {
79        match self {
80            Body::Empty => {
81                *self = Self::stream(stream);
82            }
83            Body::Once(bytes) => {
84                let stream = once(ok(mem::take(bytes))).chain(stream);
85                *self = Self::stream(stream);
86            }
87            Body::Stream(segment) => {
88                *self = Self::stream(mem::take(segment).chain(stream));
89            }
90        }
91        self
92    }
93
94    /// Write reader with default chunk size.
95    #[inline]
96    pub fn write_reader(
97        &mut self,
98        reader: impl AsyncRead + Sync + Send + Unpin + 'static,
99    ) -> &mut Self {
100        self.write_chunk(reader, DEFAULT_CHUNK_SIZE)
101    }
102
103    /// Write reader with chunk size.
104    #[inline]
105    pub fn write_chunk(
106        &mut self,
107        reader: impl AsyncRead + Sync + Send + Unpin + 'static,
108        chunk_size: usize,
109    ) -> &mut Self {
110        self.write_stream(ReaderStream::new(reader, chunk_size))
111    }
112
113    /// Write `Bytes`.
114    #[inline]
115    pub fn write(&mut self, data: impl Into<Bytes>) -> &mut Self {
116        match self {
117            Body::Empty => {
118                *self = Self::once(data.into());
119                self
120            }
121            body => body.write_stream(once(ok(data.into()))),
122        }
123    }
124}
125
126impl Segment {
127    #[inline]
128    fn new(stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static) -> Self {
129        Self(Some(Box::pin(stream)))
130    }
131}
132
133impl From<Body> for hyper::Body {
134    #[inline]
135    fn from(body: Body) -> Self {
136        match body {
137            Body::Empty => hyper::Body::empty(),
138            Body::Once(bytes) => hyper::Body::from(bytes),
139            Body::Stream(stream) => hyper::Body::wrap_stream(stream),
140        }
141    }
142}
143
144impl Default for Body {
145    #[inline]
146    fn default() -> Self {
147        Self::empty()
148    }
149}
150
151pub struct ReaderStream<R> {
152    chunk_size: usize,
153    reader: R,
154}
155
156impl<R> ReaderStream<R> {
157    #[inline]
158    fn new(reader: R, chunk_size: usize) -> Self {
159        Self { reader, chunk_size }
160    }
161}
162
163impl<R> Stream for ReaderStream<R>
164where
165    R: AsyncRead + Unpin,
166{
167    type Item = io::Result<Bytes>;
168    #[inline]
169    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170        let chunk_size = self.chunk_size;
171        let mut chunk = BytesMut::with_capacity(chunk_size);
172        unsafe { chunk.set_len(chunk_size) };
173        let mut buf = ReadBuf::new(&mut *chunk);
174        futures::ready!(Pin::new(&mut self.reader).poll_read(cx, &mut buf))?;
175        let filled_len = buf.filled().len();
176        if filled_len == 0 {
177            Poll::Ready(None)
178        } else {
179            Poll::Ready(Some(Ok(chunk.freeze().slice(0..filled_len))))
180        }
181    }
182}
183
184impl Stream for Body {
185    type Item = io::Result<Bytes>;
186    #[inline]
187    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        match &mut *self {
189            Body::Empty => Poll::Ready(None),
190            Body::Once(bytes) => {
191                let data = mem::take(bytes);
192                *self = Body::empty();
193                Poll::Ready(Some(Ok(data)))
194            }
195            Body::Stream(stream) => Pin::new(stream).poll_next(cx),
196        }
197    }
198}
199
200impl Stream for Segment {
201    type Item = io::Result<Bytes>;
202    #[inline]
203    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204        match self.0 {
205            None => Poll::Ready(None),
206            Some(ref mut stream) => stream.as_mut().poll_next(cx),
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::io;
214
215    use futures::{AsyncReadExt, TryStreamExt};
216    use tokio::fs::File;
217
218    use super::Body;
219
220    async fn read_body(body: Body) -> io::Result<String> {
221        let mut data = String::new();
222        body.into_async_read().read_to_string(&mut data).await?;
223        Ok(data)
224    }
225
226    #[tokio::test]
227    async fn body_empty() -> std::io::Result<()> {
228        let body = Body::default();
229        assert_eq!("", read_body(body).await?);
230        Ok(())
231    }
232
233    #[tokio::test]
234    async fn body_single() -> std::io::Result<()> {
235        let mut body = Body::default();
236        body.write("Hello, World");
237        assert_eq!("Hello, World", read_body(body).await?);
238        Ok(())
239    }
240
241    #[tokio::test]
242    async fn body_multiple() -> std::io::Result<()> {
243        let mut body = Body::default();
244        body.write("He").write("llo, ").write("World");
245        assert_eq!("Hello, World", read_body(body).await?);
246        Ok(())
247    }
248
249    #[tokio::test]
250    async fn body_composed() -> std::io::Result<()> {
251        let mut body = Body::empty();
252        body.write("He")
253            .write("llo, ")
254            .write_reader(File::open("../assets/author.txt").await?)
255            .write_reader(File::open("../assets/author.txt").await?)
256            .write(".");
257        assert_eq!("Hello, HexileeHexilee.", read_body(body).await?);
258        Ok(())
259    }
260}