1#![allow(missing_docs)]
32
33use std::{
34 fmt,
35 pin::Pin,
36 task::{Context, Poll},
37};
38
39use bytes::Bytes;
40#[cfg(feature = "streaming")]
41use bytes::BytesMut;
42use http_body::{Body, Frame};
43use pin_project_lite::pin_project;
44
45use crate::error::StreamingError;
46
47#[cfg(feature = "streaming")]
52const STREAM_BUFFER_SIZE: usize = 64 * 1024;
53
54#[cfg(feature = "streaming")]
56pin_project! {
57 #[project = StreamingBodyProj]
70 pub enum StreamingBody<B> {
71 Buffered {
72 data: Option<Bytes>,
73 },
74 Streaming {
75 #[pin]
76 inner: B,
77 },
78 File {
79 #[pin]
80 reader: tokio::fs::File,
81 buffer: BytesMut,
82 done: bool,
83 size: u64,
84 },
85 }
86}
87
88#[cfg(not(feature = "streaming"))]
90pin_project! {
91 #[project = StreamingBodyProj]
102 pub enum StreamingBody<B> {
103 Buffered {
104 data: Option<Bytes>,
105 },
106 Streaming {
107 #[pin]
108 inner: B,
109 },
110 }
111}
112
113impl<B> StreamingBody<B> {
114 #[must_use]
118 pub fn buffered(data: Bytes) -> Self {
119 Self::Buffered { data: Some(data) }
120 }
121
122 #[must_use]
126 pub fn streaming(body: B) -> Self {
127 Self::Streaming { inner: body }
128 }
129
130 #[cfg(feature = "streaming")]
146 #[must_use]
147 pub fn from_file_with_size(file: tokio::fs::File, size: u64) -> Self {
148 Self::File {
149 reader: file,
150 buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
151 done: false,
152 size,
153 }
154 }
155}
156
157#[cfg(feature = "streaming")]
158impl<B> Body for StreamingBody<B>
159where
160 B: Body + Unpin,
161 B::Error: Into<StreamingError>,
162 B::Data: Into<Bytes>,
163{
164 type Data = Bytes;
165 type Error = StreamingError;
166
167 fn poll_frame(
168 mut self: Pin<&mut Self>,
169 cx: &mut Context<'_>,
170 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
171 match self.as_mut().project() {
172 StreamingBodyProj::Buffered { data } => {
173 if let Some(bytes) = data.take() {
174 if bytes.is_empty() {
175 Poll::Ready(None)
176 } else {
177 Poll::Ready(Some(Ok(Frame::data(bytes))))
178 }
179 } else {
180 Poll::Ready(None)
181 }
182 }
183 StreamingBodyProj::Streaming { inner } => {
184 inner.poll_frame(cx).map(|opt| {
185 opt.map(|res| {
186 res.map(|frame| frame.map_data(Into::into))
187 .map_err(Into::into)
188 })
189 })
190 }
191 StreamingBodyProj::File { reader, buffer, done, .. } => {
192 if *done {
193 return Poll::Ready(None);
194 }
195
196 use tokio::io::AsyncRead;
197
198 buffer.resize(STREAM_BUFFER_SIZE, 0);
200
201 let mut read_buf = tokio::io::ReadBuf::new(buffer.as_mut());
202
203 match reader.poll_read(cx, &mut read_buf) {
204 Poll::Ready(Ok(())) => {
205 let filled_len = read_buf.filled().len();
206 if filled_len == 0 {
207 *done = true;
208 buffer.clear();
209 Poll::Ready(None)
210 } else {
211 buffer.truncate(filled_len);
213 let bytes = buffer.split().freeze();
214 Poll::Ready(Some(Ok(Frame::data(bytes))))
215 }
216 }
217 Poll::Ready(Err(e)) => {
218 *done = true;
219 buffer.clear();
220 Poll::Ready(Some(Err(StreamingError::new(Box::new(e)))))
221 }
222 Poll::Pending => Poll::Pending,
223 }
224 }
225 }
226 }
227
228 fn is_end_stream(&self) -> bool {
229 match self {
230 StreamingBody::Buffered { data } => data.is_none(),
231 StreamingBody::Streaming { inner } => inner.is_end_stream(),
232 StreamingBody::File { done, .. } => *done,
233 }
234 }
235
236 fn size_hint(&self) -> http_body::SizeHint {
237 match self {
238 StreamingBody::Buffered { data } => {
239 if let Some(bytes) = data {
240 let len = bytes.len() as u64;
241 http_body::SizeHint::with_exact(len)
242 } else {
243 http_body::SizeHint::with_exact(0)
244 }
245 }
246 StreamingBody::Streaming { inner } => inner.size_hint(),
247 StreamingBody::File { size, .. } => {
248 http_body::SizeHint::with_exact(*size)
249 }
250 }
251 }
252}
253
254#[cfg(not(feature = "streaming"))]
255impl<B> Body for StreamingBody<B>
256where
257 B: Body + Unpin,
258 B::Error: Into<StreamingError>,
259 B::Data: Into<Bytes>,
260{
261 type Data = Bytes;
262 type Error = StreamingError;
263
264 fn poll_frame(
265 mut self: Pin<&mut Self>,
266 cx: &mut Context<'_>,
267 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
268 match self.as_mut().project() {
269 StreamingBodyProj::Buffered { data } => {
270 if let Some(bytes) = data.take() {
271 if bytes.is_empty() {
272 Poll::Ready(None)
273 } else {
274 Poll::Ready(Some(Ok(Frame::data(bytes))))
275 }
276 } else {
277 Poll::Ready(None)
278 }
279 }
280 StreamingBodyProj::Streaming { inner } => {
281 inner.poll_frame(cx).map(|opt| {
282 opt.map(|res| {
283 res.map(|frame| frame.map_data(Into::into))
284 .map_err(Into::into)
285 })
286 })
287 }
288 }
289 }
290
291 fn is_end_stream(&self) -> bool {
292 match self {
293 StreamingBody::Buffered { data } => data.is_none(),
294 StreamingBody::Streaming { inner } => inner.is_end_stream(),
295 }
296 }
297
298 fn size_hint(&self) -> http_body::SizeHint {
299 match self {
300 StreamingBody::Buffered { data } => {
301 if let Some(bytes) = data {
302 let len = bytes.len() as u64;
303 http_body::SizeHint::with_exact(len)
304 } else {
305 http_body::SizeHint::with_exact(0)
306 }
307 }
308 StreamingBody::Streaming { inner } => inner.size_hint(),
309 }
310 }
311}
312
313impl<B> From<Bytes> for StreamingBody<B> {
314 fn from(bytes: Bytes) -> Self {
315 Self::buffered(bytes)
316 }
317}
318
319#[cfg(feature = "streaming")]
320impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322 match self {
323 Self::Buffered { data } => f
324 .debug_struct("StreamingBody::Buffered")
325 .field("has_data", &data.is_some())
326 .field("len", &data.as_ref().map(|b| b.len()))
327 .finish(),
328 Self::Streaming { inner } => f
329 .debug_struct("StreamingBody::Streaming")
330 .field("inner", inner)
331 .finish(),
332 Self::File { done, size, .. } => f
333 .debug_struct("StreamingBody::File")
334 .field("done", done)
335 .field("size", &size)
336 .finish_non_exhaustive(),
337 }
338 }
339}
340
341#[cfg(not(feature = "streaming"))]
342impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 match self {
345 Self::Buffered { data } => f
346 .debug_struct("StreamingBody::Buffered")
347 .field("has_data", &data.is_some())
348 .field("len", &data.as_ref().map(|b| b.len()))
349 .finish(),
350 Self::Streaming { inner } => f
351 .debug_struct("StreamingBody::Streaming")
352 .field("inner", inner)
353 .finish(),
354 }
355 }
356}
357
358#[cfg(feature = "streaming")]
359impl<B> StreamingBody<B>
360where
361 B: Body + Unpin + Send,
362 B::Error: Into<StreamingError>,
363 B::Data: Into<Bytes>,
364{
365 pub fn into_bytes_stream(
369 self,
370 ) -> impl futures_util::Stream<
371 Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
372 > + Send {
373 use futures_util::TryStreamExt;
374
375 http_body_util::BodyStream::new(self)
376 .map_ok(|frame| {
377 frame.into_data().unwrap_or_else(|_| Bytes::new())
379 })
380 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
381 Box::new(std::io::Error::other(format!("Stream error: {e}")))
382 })
383 }
384}