1#![allow(missing_docs)]
32
33use std::{
34 fmt,
35 pin::Pin,
36 task::{Context, Poll},
37};
38
39use bytes::Bytes;
40#[cfg(feature = "streaming")]
41use bytes::BytesMut;
42use http_body::{Body, Frame};
43use pin_project_lite::pin_project;
44
45use crate::error::StreamingError;
46
47#[cfg(feature = "streaming")]
52const STREAM_BUFFER_SIZE: usize = 64 * 1024;
53
54#[cfg(feature = "streaming")]
56pin_project! {
57 #[project = StreamingBodyProj]
70 pub enum StreamingBody<B> {
71 Buffered {
72 data: Option<Bytes>,
73 },
74 Streaming {
75 #[pin]
76 inner: B,
77 },
78 File {
79 #[pin]
80 reader: cacache::Reader,
81 buffer: BytesMut,
82 done: bool,
83 size: Option<u64>,
84 },
85 }
86}
87
88#[cfg(not(feature = "streaming"))]
90pin_project! {
91 #[project = StreamingBodyProj]
102 pub enum StreamingBody<B> {
103 Buffered {
104 data: Option<Bytes>,
105 },
106 Streaming {
107 #[pin]
108 inner: B,
109 },
110 }
111}
112
113impl<B> StreamingBody<B> {
114 #[must_use]
118 pub fn buffered(data: Bytes) -> Self {
119 Self::Buffered { data: Some(data) }
120 }
121
122 #[must_use]
126 pub fn streaming(body: B) -> Self {
127 Self::Streaming { inner: body }
128 }
129
130 #[cfg(feature = "streaming")]
138 #[must_use]
139 pub fn from_reader(reader: cacache::Reader) -> Self {
140 Self::File {
141 reader,
142 buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
143 done: false,
144 size: None,
145 }
146 }
147
148 #[cfg(feature = "streaming")]
155 #[must_use]
156 pub fn from_reader_with_size(reader: cacache::Reader, size: u64) -> Self {
157 Self::File {
158 reader,
159 buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
160 done: false,
161 size: Some(size),
162 }
163 }
164}
165
166#[cfg(feature = "streaming")]
167impl<B> Body for StreamingBody<B>
168where
169 B: Body + Unpin,
170 B::Error: Into<StreamingError>,
171 B::Data: Into<Bytes>,
172{
173 type Data = Bytes;
174 type Error = StreamingError;
175
176 fn poll_frame(
177 mut self: Pin<&mut Self>,
178 cx: &mut Context<'_>,
179 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
180 match self.as_mut().project() {
181 StreamingBodyProj::Buffered { data } => {
182 if let Some(bytes) = data.take() {
183 if bytes.is_empty() {
184 Poll::Ready(None)
185 } else {
186 Poll::Ready(Some(Ok(Frame::data(bytes))))
187 }
188 } else {
189 Poll::Ready(None)
190 }
191 }
192 StreamingBodyProj::Streaming { inner } => {
193 inner.poll_frame(cx).map(|opt| {
194 opt.map(|res| {
195 res.map(|frame| frame.map_data(Into::into))
196 .map_err(Into::into)
197 })
198 })
199 }
200 StreamingBodyProj::File { reader, buffer, done, .. } => {
201 if *done {
202 return Poll::Ready(None);
203 }
204
205 use tokio::io::AsyncRead;
206
207 buffer.resize(STREAM_BUFFER_SIZE, 0);
209
210 let mut read_buf = tokio::io::ReadBuf::new(buffer.as_mut());
211
212 match reader.poll_read(cx, &mut read_buf) {
213 Poll::Ready(Ok(())) => {
214 let filled_len = read_buf.filled().len();
215 if filled_len == 0 {
216 *done = true;
217 buffer.clear();
218 Poll::Ready(None)
219 } else {
220 buffer.truncate(filled_len);
222 let bytes = buffer.split().freeze();
223 Poll::Ready(Some(Ok(Frame::data(bytes))))
224 }
225 }
226 Poll::Ready(Err(e)) => {
227 *done = true;
228 buffer.clear();
229 Poll::Ready(Some(Err(StreamingError::new(Box::new(e)))))
230 }
231 Poll::Pending => Poll::Pending,
232 }
233 }
234 }
235 }
236
237 fn is_end_stream(&self) -> bool {
238 match self {
239 StreamingBody::Buffered { data } => data.is_none(),
240 StreamingBody::Streaming { inner } => inner.is_end_stream(),
241 StreamingBody::File { done, .. } => *done,
242 }
243 }
244
245 fn size_hint(&self) -> http_body::SizeHint {
246 match self {
247 StreamingBody::Buffered { data } => {
248 if let Some(bytes) = data {
249 let len = bytes.len() as u64;
250 http_body::SizeHint::with_exact(len)
251 } else {
252 http_body::SizeHint::with_exact(0)
253 }
254 }
255 StreamingBody::Streaming { inner } => inner.size_hint(),
256 StreamingBody::File { size, .. } => {
257 if let Some(s) = size {
259 http_body::SizeHint::with_exact(*s)
260 } else {
261 http_body::SizeHint::default()
262 }
263 }
264 }
265 }
266}
267
268#[cfg(not(feature = "streaming"))]
269impl<B> Body for StreamingBody<B>
270where
271 B: Body + Unpin,
272 B::Error: Into<StreamingError>,
273 B::Data: Into<Bytes>,
274{
275 type Data = Bytes;
276 type Error = StreamingError;
277
278 fn poll_frame(
279 mut self: Pin<&mut Self>,
280 cx: &mut Context<'_>,
281 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
282 match self.as_mut().project() {
283 StreamingBodyProj::Buffered { data } => {
284 if let Some(bytes) = data.take() {
285 if bytes.is_empty() {
286 Poll::Ready(None)
287 } else {
288 Poll::Ready(Some(Ok(Frame::data(bytes))))
289 }
290 } else {
291 Poll::Ready(None)
292 }
293 }
294 StreamingBodyProj::Streaming { inner } => {
295 inner.poll_frame(cx).map(|opt| {
296 opt.map(|res| {
297 res.map(|frame| frame.map_data(Into::into))
298 .map_err(Into::into)
299 })
300 })
301 }
302 }
303 }
304
305 fn is_end_stream(&self) -> bool {
306 match self {
307 StreamingBody::Buffered { data } => data.is_none(),
308 StreamingBody::Streaming { inner } => inner.is_end_stream(),
309 }
310 }
311
312 fn size_hint(&self) -> http_body::SizeHint {
313 match self {
314 StreamingBody::Buffered { data } => {
315 if let Some(bytes) = data {
316 let len = bytes.len() as u64;
317 http_body::SizeHint::with_exact(len)
318 } else {
319 http_body::SizeHint::with_exact(0)
320 }
321 }
322 StreamingBody::Streaming { inner } => inner.size_hint(),
323 }
324 }
325}
326
327impl<B> From<Bytes> for StreamingBody<B> {
328 fn from(bytes: Bytes) -> Self {
329 Self::buffered(bytes)
330 }
331}
332
333#[cfg(feature = "streaming")]
334impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 Self::Buffered { data } => f
338 .debug_struct("StreamingBody::Buffered")
339 .field("has_data", &data.is_some())
340 .field("len", &data.as_ref().map(|b| b.len()))
341 .finish(),
342 Self::Streaming { inner } => f
343 .debug_struct("StreamingBody::Streaming")
344 .field("inner", inner)
345 .finish(),
346 Self::File { done, size, .. } => f
347 .debug_struct("StreamingBody::File")
348 .field("done", done)
349 .field("size", size)
350 .finish_non_exhaustive(),
351 }
352 }
353}
354
355#[cfg(not(feature = "streaming"))]
356impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358 match self {
359 Self::Buffered { data } => f
360 .debug_struct("StreamingBody::Buffered")
361 .field("has_data", &data.is_some())
362 .field("len", &data.as_ref().map(|b| b.len()))
363 .finish(),
364 Self::Streaming { inner } => f
365 .debug_struct("StreamingBody::Streaming")
366 .field("inner", inner)
367 .finish(),
368 }
369 }
370}
371
372#[cfg(feature = "streaming")]
373impl<B> StreamingBody<B>
374where
375 B: Body + Unpin + Send,
376 B::Error: Into<StreamingError>,
377 B::Data: Into<Bytes>,
378{
379 pub fn into_bytes_stream(
383 self,
384 ) -> impl futures_util::Stream<
385 Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
386 > + Send {
387 use futures_util::TryStreamExt;
388
389 http_body_util::BodyStream::new(self)
390 .map_ok(|frame| {
391 frame.into_data().unwrap_or_else(|_| Bytes::new())
393 })
394 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
395 Box::new(std::io::Error::other(format!("Stream error: {e}")))
396 })
397 }
398}