1use std::mem;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::{Bytes, BytesMut};
6use futures::future::ok;
7use futures::stream::{once, Stream, StreamExt};
8use tokio::io::{self, AsyncRead, ReadBuf};
9
10const DEFAULT_CHUNK_SIZE: usize = 4096;
11
12pub enum Body {
37 Empty,
39
40 Once(Bytes),
42
43 Stream(Segment),
45}
46
47#[derive(Default)]
49pub struct Segment(Option<Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Sync + Send + 'static>>>);
50
51impl Body {
52 #[inline]
54 pub fn empty() -> Self {
55 Body::Empty
56 }
57
58 #[inline]
60 pub fn once(bytes: impl Into<Bytes>) -> Self {
61 Body::Once(bytes.into())
62 }
63
64 #[inline]
66 pub fn stream<S>(stream: S) -> Self
67 where
68 S: Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
69 {
70 Body::Stream(Segment::new(stream))
71 }
72
73 #[inline]
75 pub fn write_stream(
76 &mut self,
77 stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
78 ) -> &mut Self {
79 match self {
80 Body::Empty => {
81 *self = Self::stream(stream);
82 }
83 Body::Once(bytes) => {
84 let stream = once(ok(mem::take(bytes))).chain(stream);
85 *self = Self::stream(stream);
86 }
87 Body::Stream(segment) => {
88 *self = Self::stream(mem::take(segment).chain(stream));
89 }
90 }
91 self
92 }
93
94 #[inline]
96 pub fn write_reader(
97 &mut self,
98 reader: impl AsyncRead + Sync + Send + Unpin + 'static,
99 ) -> &mut Self {
100 self.write_chunk(reader, DEFAULT_CHUNK_SIZE)
101 }
102
103 #[inline]
105 pub fn write_chunk(
106 &mut self,
107 reader: impl AsyncRead + Sync + Send + Unpin + 'static,
108 chunk_size: usize,
109 ) -> &mut Self {
110 self.write_stream(ReaderStream::new(reader, chunk_size))
111 }
112
113 #[inline]
115 pub fn write(&mut self, data: impl Into<Bytes>) -> &mut Self {
116 match self {
117 Body::Empty => {
118 *self = Self::once(data.into());
119 self
120 }
121 body => body.write_stream(once(ok(data.into()))),
122 }
123 }
124}
125
126impl Segment {
127 #[inline]
128 fn new(stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static) -> Self {
129 Self(Some(Box::pin(stream)))
130 }
131}
132
133impl From<Body> for hyper::Body {
134 #[inline]
135 fn from(body: Body) -> Self {
136 match body {
137 Body::Empty => hyper::Body::empty(),
138 Body::Once(bytes) => hyper::Body::from(bytes),
139 Body::Stream(stream) => hyper::Body::wrap_stream(stream),
140 }
141 }
142}
143
144impl Default for Body {
145 #[inline]
146 fn default() -> Self {
147 Self::empty()
148 }
149}
150
151pub struct ReaderStream<R> {
152 chunk_size: usize,
153 reader: R,
154}
155
156impl<R> ReaderStream<R> {
157 #[inline]
158 fn new(reader: R, chunk_size: usize) -> Self {
159 Self { reader, chunk_size }
160 }
161}
162
163impl<R> Stream for ReaderStream<R>
164where
165 R: AsyncRead + Unpin,
166{
167 type Item = io::Result<Bytes>;
168 #[inline]
169 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170 let chunk_size = self.chunk_size;
171 let mut chunk = BytesMut::with_capacity(chunk_size);
172 unsafe { chunk.set_len(chunk_size) };
173 let mut buf = ReadBuf::new(&mut *chunk);
174 futures::ready!(Pin::new(&mut self.reader).poll_read(cx, &mut buf))?;
175 let filled_len = buf.filled().len();
176 if filled_len == 0 {
177 Poll::Ready(None)
178 } else {
179 Poll::Ready(Some(Ok(chunk.freeze().slice(0..filled_len))))
180 }
181 }
182}
183
184impl Stream for Body {
185 type Item = io::Result<Bytes>;
186 #[inline]
187 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188 match &mut *self {
189 Body::Empty => Poll::Ready(None),
190 Body::Once(bytes) => {
191 let data = mem::take(bytes);
192 *self = Body::empty();
193 Poll::Ready(Some(Ok(data)))
194 }
195 Body::Stream(stream) => Pin::new(stream).poll_next(cx),
196 }
197 }
198}
199
200impl Stream for Segment {
201 type Item = io::Result<Bytes>;
202 #[inline]
203 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204 match self.0 {
205 None => Poll::Ready(None),
206 Some(ref mut stream) => stream.as_mut().poll_next(cx),
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use std::io;
214
215 use futures::{AsyncReadExt, TryStreamExt};
216 use tokio::fs::File;
217
218 use super::Body;
219
220 async fn read_body(body: Body) -> io::Result<String> {
221 let mut data = String::new();
222 body.into_async_read().read_to_string(&mut data).await?;
223 Ok(data)
224 }
225
226 #[tokio::test]
227 async fn body_empty() -> std::io::Result<()> {
228 let body = Body::default();
229 assert_eq!("", read_body(body).await?);
230 Ok(())
231 }
232
233 #[tokio::test]
234 async fn body_single() -> std::io::Result<()> {
235 let mut body = Body::default();
236 body.write("Hello, World");
237 assert_eq!("Hello, World", read_body(body).await?);
238 Ok(())
239 }
240
241 #[tokio::test]
242 async fn body_multiple() -> std::io::Result<()> {
243 let mut body = Body::default();
244 body.write("He").write("llo, ").write("World");
245 assert_eq!("Hello, World", read_body(body).await?);
246 Ok(())
247 }
248
249 #[tokio::test]
250 async fn body_composed() -> std::io::Result<()> {
251 let mut body = Body::empty();
252 body.write("He")
253 .write("llo, ")
254 .write_reader(File::open("../assets/author.txt").await?)
255 .write_reader(File::open("../assets/author.txt").await?)
256 .write(".");
257 assert_eq!("Hello, HexileeHexilee.", read_body(body).await?);
258 Ok(())
259 }
260}