1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15pub trait BodySource: AsyncRead + Send + 'static {
22 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30 struct PlainBody<T> {
31 #[pin]
32 async_read: T,
33 }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37 fn poll_read(
38 self: Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 buf: &mut [u8],
41 ) -> Poll<Result<usize>> {
42 self.project().async_read.poll_read(cx, buf)
43 }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48 None
49 }
50}
51
52#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63 Self::new_with_trailers(PlainBody { async_read }, len)
64 }
65
66 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72 Self(Streaming {
73 async_read: SyncWrapper::new(Box::pin(body)),
74 len,
75 done: false,
76 progress: 0,
77 chunked_framing: true,
78 keep_open: false,
79 })
80 }
81
82 #[doc(hidden)]
89 #[cfg(feature = "unstable")]
90 #[must_use]
91 pub fn without_chunked_framing(mut self) -> Self {
92 if let Streaming {
93 ref mut chunked_framing,
94 ..
95 } = self.0
96 {
97 *chunked_framing = false;
98 }
99 self
100 }
101
102 #[doc(hidden)]
115 #[cfg(feature = "unstable")]
116 #[must_use]
117 pub fn keep_open(mut self) -> Self {
118 self.set_keep_open();
119 self
120 }
121
122 pub(crate) fn set_keep_open(&mut self) {
125 if matches!(self.0, Static { .. }) {
129 let reader = std::mem::take(self).into_reader();
130 *self = Self::new_streaming(reader, None);
131 }
132
133 if let Streaming {
134 ref mut len,
135 ref mut chunked_framing,
136 ref mut keep_open,
137 ..
138 } = self.0
139 {
140 *len = None;
141 *chunked_framing = true;
142 *keep_open = true;
143 }
144 }
145
146 pub(crate) fn set_chunked_framing(&mut self, on: bool) {
152 if let Streaming {
153 ref mut chunked_framing,
154 ..
155 } = self.0
156 {
157 *chunked_framing = on;
158 }
159 }
160
161 #[doc(hidden)]
167 pub fn trailers(&mut self) -> Option<Headers> {
168 match &mut self.0 {
169 Streaming {
170 async_read, done, ..
171 } if *done => async_read.get_mut().as_mut().trailers(),
172 _ => None,
173 }
174 }
175
176 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
179 Self(Static {
180 content: StaticContent::Cow(content.into()),
181 cursor: 0,
182 })
183 }
184
185 pub fn static_bytes(&self) -> Option<&[u8]> {
189 match &self.0 {
190 Static { content, .. } => Some(content.as_ref()),
191 _ => None,
192 }
193 }
194
195 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
199 match self.0 {
200 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
201 Static { content, .. } => Box::pin(Cursor::new(content)),
202 Empty => Box::pin(Cursor::new("")),
203 }
204 }
205
206 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
215 match self.0 {
216 Static { content, .. } => Ok(content.into_cow()),
217
218 Streaming {
219 async_read,
220 len,
221 progress: 0,
222 done: false,
223 ..
224 } => {
225 let mut async_read = async_read.into_inner();
226 let mut buf = len
227 .and_then(|c| c.try_into().ok())
228 .map(Vec::with_capacity)
229 .unwrap_or_default();
230
231 async_read.read_to_end(&mut buf).await?;
232
233 Ok(Cow::Owned(buf))
234 }
235
236 Empty => Ok(Cow::Borrowed(b"")),
237
238 Streaming { .. } => Err(Error::other("body already read to completion")),
239 }
240 }
241
242 pub fn bytes_read(&self) -> u64 {
245 self.0.bytes_read()
246 }
247
248 pub fn len(&self) -> Option<u64> {
251 self.0.len()
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.0.is_empty()
257 }
258
259 pub fn is_static(&self) -> bool {
261 matches!(self.0, Static { .. })
262 }
263
264 pub fn is_streaming(&self) -> bool {
266 matches!(self.0, Streaming { .. })
267 }
268
269 #[cfg(feature = "unstable")]
276 #[doc(hidden)]
277 pub fn into_body_source(self) -> Option<Pin<Box<dyn BodySource>>> {
278 match self.0 {
279 Streaming { async_read, .. } => Some(async_read.into_inner()),
280 _ => None,
281 }
282 }
283
284 #[doc(hidden)]
291 #[cfg(feature = "unstable")]
292 pub fn try_clone(&self) -> Option<Self> {
293 match &self.0 {
294 Empty => Some(Self::default()),
295 Static { content, .. } => Some(Self(Static {
296 content: content.clone(),
297 cursor: 0,
298 })),
299 Streaming { .. } => None,
300 }
301 }
302
303 #[cfg(feature = "unstable")]
305 pub fn into_h3(self) -> H3Body {
306 H3Body::new(self)
307 }
308
309 #[cfg(not(feature = "unstable"))]
311 pub(crate) fn into_h3(self) -> H3Body {
312 H3Body::new(self)
313 }
314
315 pub(crate) fn into_h2(self) -> H2Body {
322 H2Body::new(self)
323 }
324}
325
326#[allow(
327 clippy::cast_sign_loss,
328 clippy::cast_possible_truncation,
329 clippy::cast_precision_loss,
330 reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
331 the subtraction always yields a non-negative usize-representable value"
332)]
333fn max_bytes_to_read(buf_len: usize) -> usize {
334 assert!(
335 buf_len >= 6,
336 "buffers of length {buf_len} are too small for this implementation.
337 if this is a problem for you, please open an issue"
338 );
339
340 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
341 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
343 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
344}
345
346impl AsyncRead for Body {
347 fn poll_read(
348 mut self: Pin<&mut Self>,
349 cx: &mut Context<'_>,
350 buf: &mut [u8],
351 ) -> Poll<Result<usize>> {
352 match &mut self.0 {
353 Empty => Poll::Ready(Ok(0)),
354 Static { content, cursor } => {
355 let length = content.len();
356 if length == *cursor {
357 return Poll::Ready(Ok(0));
358 }
359 let bytes = (length - *cursor).min(buf.len());
360 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
361 *cursor += bytes;
362 Poll::Ready(Ok(bytes))
363 }
364
365 Streaming {
366 async_read,
367 len: Some(len),
368 done,
369 progress,
370 ..
371 } => {
372 if *done {
373 return Poll::Ready(Ok(0));
374 }
375
376 let max_bytes_to_read = (*len - *progress)
377 .try_into()
378 .unwrap_or(buf.len())
379 .min(buf.len());
380
381 let bytes = ready!(
382 async_read
383 .get_mut()
384 .as_mut()
385 .poll_read(cx, &mut buf[..max_bytes_to_read])
386 )?;
387
388 if bytes == 0 {
389 *done = true;
390 } else {
391 *progress += bytes as u64;
392 }
393
394 Poll::Ready(Ok(bytes))
395 }
396
397 Streaming {
398 async_read,
399 len: None,
400 done,
401 progress,
402 chunked_framing,
403 keep_open,
404 } => {
405 if *done {
406 return Poll::Ready(Ok(0));
407 }
408
409 if !*chunked_framing {
410 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
411 if bytes == 0 {
412 *done = true;
413 } else {
414 *progress += bytes as u64;
415 }
416 return Poll::Ready(Ok(bytes));
417 }
418
419 let max_bytes_to_read = max_bytes_to_read(buf.len());
420
421 let bytes = ready!(
422 async_read
423 .get_mut()
424 .as_mut()
425 .poll_read(cx, &mut buf[..max_bytes_to_read])
426 )?;
427
428 if bytes == 0 {
429 *done = true;
430 if *keep_open {
431 return Poll::Ready(Ok(0));
434 }
435 buf[..3].copy_from_slice(b"0\r\n");
441 return Poll::Ready(Ok(3));
442 }
443
444 *progress += bytes as u64;
445
446 let start = format!("{bytes:X}\r\n");
447 let start_length = start.len();
448 let total = bytes + start_length + 2;
449 buf.copy_within(..bytes, start_length);
450 buf[..start_length].copy_from_slice(start.as_bytes());
451 buf[total - 2..total].copy_from_slice(b"\r\n");
452 Poll::Ready(Ok(total))
453 }
454 }
455 }
456}
457
458struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
459impl Debug for SyncAsyncReader {
460 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
461 f.debug_struct("SyncAsyncReader").finish()
462 }
463}
464impl AsyncRead for SyncAsyncReader {
465 fn poll_read(
466 self: Pin<&mut Self>,
467 cx: &mut Context<'_>,
468 buf: &mut [u8],
469 ) -> Poll<Result<usize>> {
470 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
471 }
472}
473
474#[derive(Clone)]
477pub(crate) enum StaticContent {
478 Cow(Cow<'static, [u8]>),
479 Bytes(Arc<[u8]>),
480 Str(Arc<str>),
481}
482
483impl std::ops::Deref for StaticContent {
484 type Target = [u8];
485
486 fn deref(&self) -> &[u8] {
487 match self {
488 StaticContent::Cow(content) => content,
489 StaticContent::Bytes(content) => content,
490 StaticContent::Str(content) => content.as_bytes(),
491 }
492 }
493}
494
495impl AsRef<[u8]> for StaticContent {
496 fn as_ref(&self) -> &[u8] {
497 self
498 }
499}
500
501impl StaticContent {
502 fn into_cow(self) -> Cow<'static, [u8]> {
505 match self {
506 StaticContent::Cow(content) => content,
507 other => Cow::Owned(other.to_vec()),
508 }
509 }
510}
511
512#[derive(Default)]
513pub(crate) enum BodyType {
514 #[default]
515 Empty,
516
517 Static {
518 content: StaticContent,
519 cursor: usize,
520 },
521
522 Streaming {
523 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
524 progress: u64,
525 len: Option<u64>,
526 done: bool,
527 chunked_framing: bool,
531 keep_open: bool,
535 },
536}
537
538impl Debug for BodyType {
539 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
540 match self {
541 Empty => f.debug_tuple("BodyType::Empty").finish(),
542 Static { content, cursor } => f
543 .debug_struct("BodyType::Static")
544 .field("content", &String::from_utf8_lossy(content))
545 .field("cursor", cursor)
546 .finish(),
547 Streaming {
548 len,
549 done,
550 progress,
551 ..
552 } => f
553 .debug_struct("BodyType::Streaming")
554 .field("async_read", &format_args!(".."))
555 .field("len", &len)
556 .field("done", &done)
557 .field("progress", &progress)
558 .finish(),
559 }
560 }
561}
562
563impl BodyType {
564 fn is_empty(&self) -> bool {
565 match *self {
566 Empty => true,
567 Static { ref content, .. } => content.is_empty(),
568 Streaming { len, .. } => len == Some(0),
569 }
570 }
571
572 fn len(&self) -> Option<u64> {
573 match *self {
574 Empty => Some(0),
575 Static { ref content, .. } => Some(content.len() as u64),
576 Streaming { len, .. } => len,
577 }
578 }
579
580 fn bytes_read(&self) -> u64 {
581 match *self {
582 Empty => 0,
583 Static { cursor, .. } => cursor as u64,
584 Streaming { progress, .. } => progress,
585 }
586 }
587}
588
589impl From<String> for Body {
590 fn from(s: String) -> Self {
591 s.into_bytes().into()
592 }
593}
594
595impl From<&'static str> for Body {
596 fn from(s: &'static str) -> Self {
597 s.as_bytes().into()
598 }
599}
600
601impl From<&'static [u8]> for Body {
602 fn from(content: &'static [u8]) -> Self {
603 Self::new_static(content)
604 }
605}
606
607impl From<Vec<u8>> for Body {
608 fn from(content: Vec<u8>) -> Self {
609 Self::new_static(content)
610 }
611}
612
613impl From<Cow<'static, [u8]>> for Body {
614 fn from(value: Cow<'static, [u8]>) -> Self {
615 Self::new_static(value)
616 }
617}
618
619impl From<Cow<'static, str>> for Body {
620 fn from(value: Cow<'static, str>) -> Self {
621 match value {
622 Cow::Borrowed(b) => b.into(),
623 Cow::Owned(o) => o.into(),
624 }
625 }
626}
627
628impl From<Arc<[u8]>> for Body {
629 fn from(content: Arc<[u8]>) -> Self {
630 Self(Static {
631 content: StaticContent::Bytes(content),
632 cursor: 0,
633 })
634 }
635}
636
637impl From<Arc<str>> for Body {
638 fn from(content: Arc<str>) -> Self {
639 Self(Static {
640 content: StaticContent::Str(content),
641 cursor: 0,
642 })
643 }
644}
645
646#[cfg(test)]
647mod test_shared_content {
648 use super::Body;
649 use futures_lite::future::block_on;
650 use std::sync::Arc;
651
652 #[test]
653 fn arc_bytes_roundtrips() {
654 let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
655 let body = Body::from(Arc::clone(&arc));
656 assert_eq!(body.len(), Some(12));
657 assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
658 assert_eq!(
659 block_on(body.into_bytes()).unwrap().as_ref(),
660 b"shared bytes"
661 );
662 assert_eq!(&*arc, b"shared bytes");
664 }
665
666 #[test]
667 fn arc_str_roundtrips() {
668 let arc: Arc<str> = Arc::from("shared str");
669 let body = Body::from(arc);
670 assert_eq!(body.len(), Some(10));
671 assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
672 assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
673 }
674
675 #[cfg(feature = "unstable")]
676 #[test]
677 fn shared_body_clones_without_copying_the_arc() {
678 let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
679 let body = Body::from(Arc::clone(&arc));
680 let clone = body.try_clone().expect("static bodies clone");
681 assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
682 assert_eq!(Arc::strong_count(&arc), 3);
684 }
685}
686
687#[cfg(test)]
688mod test_bytes_to_read {
689 #[test]
690 fn simple_check_of_known_values() {
691 let values = vec![
700 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
716
717 for (input, expected) in values {
718 let actual = super::max_bytes_to_read(input);
719 assert_eq!(
720 actual, expected,
721 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
722 );
723
724 let used_bytes = expected + 4 + format!("{expected:X}").len();
726 assert!(
727 used_bytes == input || used_bytes == input - 1,
728 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
729 input,
730 input,
731 input - 1,
732 used_bytes
733 );
734 }
735 }
736}