1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14pub trait BodySource: AsyncRead + Send + 'static {
21 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
26}
27
28pin_project! {
29 struct PlainBody<T> {
30 #[pin]
31 async_read: T,
32 }
33}
34
35impl<T: AsyncRead> AsyncRead for PlainBody<T> {
36 fn poll_read(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 buf: &mut [u8],
40 ) -> Poll<Result<usize>> {
41 self.project().async_read.poll_read(cx, buf)
42 }
43}
44
45impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
46 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
47 None
48 }
49}
50
51#[derive(Debug, Default)]
55pub struct Body(pub(crate) BodyType);
56
57impl Body {
58 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
62 Self::new_with_trailers(PlainBody { async_read }, len)
63 }
64
65 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
71 Self(Streaming {
72 async_read: SyncWrapper::new(Box::pin(body)),
73 len,
74 done: false,
75 progress: 0,
76 chunked_framing: true,
77 keep_open: false,
78 })
79 }
80
81 #[doc(hidden)]
88 #[cfg(feature = "unstable")]
89 #[must_use]
90 pub fn without_chunked_framing(mut self) -> Self {
91 if let Streaming {
92 ref mut chunked_framing,
93 ..
94 } = self.0
95 {
96 *chunked_framing = false;
97 }
98 self
99 }
100
101 #[doc(hidden)]
114 #[cfg(feature = "unstable")]
115 #[must_use]
116 pub fn keep_open(mut self) -> Self {
117 self.set_keep_open();
118 self
119 }
120
121 pub(crate) fn set_keep_open(&mut self) {
124 if matches!(self.0, Static { .. }) {
128 let reader = std::mem::take(self).into_reader();
129 *self = Self::new_streaming(reader, None);
130 }
131
132 if let Streaming {
133 ref mut len,
134 ref mut chunked_framing,
135 ref mut keep_open,
136 ..
137 } = self.0
138 {
139 *len = None;
140 *chunked_framing = true;
141 *keep_open = true;
142 }
143 }
144
145 pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
146 if let Streaming {
147 ref mut chunked_framing,
148 ..
149 } = self.0
150 {
151 *chunked_framing = true;
152 }
153
154 self
155 }
156
157 #[doc(hidden)]
163 pub fn trailers(&mut self) -> Option<Headers> {
164 match &mut self.0 {
165 Streaming {
166 async_read, done, ..
167 } if *done => async_read.get_mut().as_mut().trailers(),
168 _ => None,
169 }
170 }
171
172 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
175 Self(Static {
176 content: content.into(),
177 cursor: 0,
178 })
179 }
180
181 pub fn static_bytes(&self) -> Option<&[u8]> {
185 match &self.0 {
186 Static { content, .. } => Some(content.as_ref()),
187 _ => None,
188 }
189 }
190
191 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
195 match self.0 {
196 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
197 Static { content, .. } => Box::pin(Cursor::new(content)),
198 Empty => Box::pin(Cursor::new("")),
199 }
200 }
201
202 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
211 match self.0 {
212 Static { content, .. } => Ok(content),
213
214 Streaming {
215 async_read,
216 len,
217 progress: 0,
218 done: false,
219 ..
220 } => {
221 let mut async_read = async_read.into_inner();
222 let mut buf = len
223 .and_then(|c| c.try_into().ok())
224 .map(Vec::with_capacity)
225 .unwrap_or_default();
226
227 async_read.read_to_end(&mut buf).await?;
228
229 Ok(Cow::Owned(buf))
230 }
231
232 Empty => Ok(Cow::Borrowed(b"")),
233
234 Streaming { .. } => Err(Error::other("body already read to completion")),
235 }
236 }
237
238 pub fn bytes_read(&self) -> u64 {
241 self.0.bytes_read()
242 }
243
244 pub fn len(&self) -> Option<u64> {
247 self.0.len()
248 }
249
250 pub fn is_empty(&self) -> bool {
252 self.0.is_empty()
253 }
254
255 pub fn is_static(&self) -> bool {
257 matches!(self.0, Static { .. })
258 }
259
260 pub fn is_streaming(&self) -> bool {
262 matches!(self.0, Streaming { .. })
263 }
264
265 #[doc(hidden)]
272 #[cfg(feature = "unstable")]
273 pub fn try_clone(&self) -> Option<Self> {
274 match &self.0 {
275 Empty => Some(Self::default()),
276 Static { content, .. } => Some(Self(Static {
277 content: content.clone(),
278 cursor: 0,
279 })),
280 Streaming { .. } => None,
281 }
282 }
283
284 #[cfg(feature = "unstable")]
286 pub fn into_h3(self) -> H3Body {
287 H3Body::new(self)
288 }
289
290 #[cfg(not(feature = "unstable"))]
292 pub(crate) fn into_h3(self) -> H3Body {
293 H3Body::new(self)
294 }
295
296 pub(crate) fn into_h2(self) -> H2Body {
303 H2Body::new(self)
304 }
305}
306
307#[allow(
308 clippy::cast_sign_loss,
309 clippy::cast_possible_truncation,
310 clippy::cast_precision_loss,
311 reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
312 the subtraction always yields a non-negative usize-representable value"
313)]
314fn max_bytes_to_read(buf_len: usize) -> usize {
315 assert!(
316 buf_len >= 6,
317 "buffers of length {buf_len} are too small for this implementation.
318 if this is a problem for you, please open an issue"
319 );
320
321 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
322 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
324 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
325}
326
327impl AsyncRead for Body {
328 fn poll_read(
329 mut self: Pin<&mut Self>,
330 cx: &mut Context<'_>,
331 buf: &mut [u8],
332 ) -> Poll<Result<usize>> {
333 match &mut self.0 {
334 Empty => Poll::Ready(Ok(0)),
335 Static { content, cursor } => {
336 let length = content.len();
337 if length == *cursor {
338 return Poll::Ready(Ok(0));
339 }
340 let bytes = (length - *cursor).min(buf.len());
341 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
342 *cursor += bytes;
343 Poll::Ready(Ok(bytes))
344 }
345
346 Streaming {
347 async_read,
348 len: Some(len),
349 done,
350 progress,
351 ..
352 } => {
353 if *done {
354 return Poll::Ready(Ok(0));
355 }
356
357 let max_bytes_to_read = (*len - *progress)
358 .try_into()
359 .unwrap_or(buf.len())
360 .min(buf.len());
361
362 let bytes = ready!(
363 async_read
364 .get_mut()
365 .as_mut()
366 .poll_read(cx, &mut buf[..max_bytes_to_read])
367 )?;
368
369 if bytes == 0 {
370 *done = true;
371 } else {
372 *progress += bytes as u64;
373 }
374
375 Poll::Ready(Ok(bytes))
376 }
377
378 Streaming {
379 async_read,
380 len: None,
381 done,
382 progress,
383 chunked_framing,
384 keep_open,
385 } => {
386 if *done {
387 return Poll::Ready(Ok(0));
388 }
389
390 if !*chunked_framing {
391 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
392 if bytes == 0 {
393 *done = true;
394 } else {
395 *progress += bytes as u64;
396 }
397 return Poll::Ready(Ok(bytes));
398 }
399
400 let max_bytes_to_read = max_bytes_to_read(buf.len());
401
402 let bytes = ready!(
403 async_read
404 .get_mut()
405 .as_mut()
406 .poll_read(cx, &mut buf[..max_bytes_to_read])
407 )?;
408
409 if bytes == 0 {
410 *done = true;
411 if *keep_open {
412 return Poll::Ready(Ok(0));
415 }
416 buf[..3].copy_from_slice(b"0\r\n");
422 return Poll::Ready(Ok(3));
423 }
424
425 *progress += bytes as u64;
426
427 let start = format!("{bytes:X}\r\n");
428 let start_length = start.len();
429 let total = bytes + start_length + 2;
430 buf.copy_within(..bytes, start_length);
431 buf[..start_length].copy_from_slice(start.as_bytes());
432 buf[total - 2..total].copy_from_slice(b"\r\n");
433 Poll::Ready(Ok(total))
434 }
435 }
436 }
437}
438
439struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
440impl Debug for SyncAsyncReader {
441 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
442 f.debug_struct("SyncAsyncReader").finish()
443 }
444}
445impl AsyncRead for SyncAsyncReader {
446 fn poll_read(
447 self: Pin<&mut Self>,
448 cx: &mut Context<'_>,
449 buf: &mut [u8],
450 ) -> Poll<Result<usize>> {
451 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
452 }
453}
454
455#[derive(Default)]
456pub(crate) enum BodyType {
457 #[default]
458 Empty,
459
460 Static {
461 content: Cow<'static, [u8]>,
462 cursor: usize,
463 },
464
465 Streaming {
466 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
467 progress: u64,
468 len: Option<u64>,
469 done: bool,
470 chunked_framing: bool,
474 keep_open: bool,
478 },
479}
480
481impl Debug for BodyType {
482 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
483 match self {
484 Empty => f.debug_tuple("BodyType::Empty").finish(),
485 Static { content, cursor } => f
486 .debug_struct("BodyType::Static")
487 .field("content", &String::from_utf8_lossy(content))
488 .field("cursor", cursor)
489 .finish(),
490 Streaming {
491 len,
492 done,
493 progress,
494 ..
495 } => f
496 .debug_struct("BodyType::Streaming")
497 .field("async_read", &format_args!(".."))
498 .field("len", &len)
499 .field("done", &done)
500 .field("progress", &progress)
501 .finish(),
502 }
503 }
504}
505
506impl BodyType {
507 fn is_empty(&self) -> bool {
508 match *self {
509 Empty => true,
510 Static { ref content, .. } => content.is_empty(),
511 Streaming { len, .. } => len == Some(0),
512 }
513 }
514
515 fn len(&self) -> Option<u64> {
516 match *self {
517 Empty => Some(0),
518 Static { ref content, .. } => Some(content.len() as u64),
519 Streaming { len, .. } => len,
520 }
521 }
522
523 fn bytes_read(&self) -> u64 {
524 match *self {
525 Empty => 0,
526 Static { cursor, .. } => cursor as u64,
527 Streaming { progress, .. } => progress,
528 }
529 }
530}
531
532impl From<String> for Body {
533 fn from(s: String) -> Self {
534 s.into_bytes().into()
535 }
536}
537
538impl From<&'static str> for Body {
539 fn from(s: &'static str) -> Self {
540 s.as_bytes().into()
541 }
542}
543
544impl From<&'static [u8]> for Body {
545 fn from(content: &'static [u8]) -> Self {
546 Self::new_static(content)
547 }
548}
549
550impl From<Vec<u8>> for Body {
551 fn from(content: Vec<u8>) -> Self {
552 Self::new_static(content)
553 }
554}
555
556impl From<Cow<'static, [u8]>> for Body {
557 fn from(value: Cow<'static, [u8]>) -> Self {
558 Self::new_static(value)
559 }
560}
561
562impl From<Cow<'static, str>> for Body {
563 fn from(value: Cow<'static, str>) -> Self {
564 match value {
565 Cow::Borrowed(b) => b.into(),
566 Cow::Owned(o) => o.into(),
567 }
568 }
569}
570
571#[cfg(test)]
572mod test_bytes_to_read {
573 #[test]
574 fn simple_check_of_known_values() {
575 let values = vec![
584 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
600
601 for (input, expected) in values {
602 let actual = super::max_bytes_to_read(input);
603 assert_eq!(
604 actual, expected,
605 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
606 );
607
608 let used_bytes = expected + 4 + format!("{expected:X}").len();
610 assert!(
611 used_bytes == input || used_bytes == input - 1,
612 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
613 input,
614 input,
615 input - 1,
616 used_bytes
617 );
618 }
619 }
620}