1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15pub trait BodySource: AsyncRead + Send + 'static {
22 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30 struct PlainBody<T> {
31 #[pin]
32 async_read: T,
33 }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37 fn poll_read(
38 self: Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 buf: &mut [u8],
41 ) -> Poll<Result<usize>> {
42 self.project().async_read.poll_read(cx, buf)
43 }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48 None
49 }
50}
51
52#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63 Self::new_with_trailers(PlainBody { async_read }, len)
64 }
65
66 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72 Self(Streaming {
73 async_read: SyncWrapper::new(Box::pin(body)),
74 len,
75 done: false,
76 progress: 0,
77 chunked_framing: true,
78 keep_open: false,
79 })
80 }
81
82 #[doc(hidden)]
89 #[cfg(feature = "unstable")]
90 #[must_use]
91 pub fn without_chunked_framing(mut self) -> Self {
92 if let Streaming {
93 ref mut chunked_framing,
94 ..
95 } = self.0
96 {
97 *chunked_framing = false;
98 }
99 self
100 }
101
102 #[doc(hidden)]
115 #[cfg(feature = "unstable")]
116 #[must_use]
117 pub fn keep_open(mut self) -> Self {
118 self.set_keep_open();
119 self
120 }
121
122 pub(crate) fn set_keep_open(&mut self) {
125 if matches!(self.0, Static { .. }) {
129 let reader = std::mem::take(self).into_reader();
130 *self = Self::new_streaming(reader, None);
131 }
132
133 if let Streaming {
134 ref mut len,
135 ref mut chunked_framing,
136 ref mut keep_open,
137 ..
138 } = self.0
139 {
140 *len = None;
141 *chunked_framing = true;
142 *keep_open = true;
143 }
144 }
145
146 pub(crate) fn set_chunked_framing(&mut self, on: bool) {
152 if let Streaming {
153 ref mut chunked_framing,
154 ..
155 } = self.0
156 {
157 *chunked_framing = on;
158 }
159 }
160
161 #[doc(hidden)]
167 pub fn trailers(&mut self) -> Option<Headers> {
168 match &mut self.0 {
169 Streaming {
170 async_read, done, ..
171 } if *done => async_read.get_mut().as_mut().trailers(),
172 _ => None,
173 }
174 }
175
176 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
179 Self(Static {
180 content: StaticContent::Cow(content.into()),
181 cursor: 0,
182 })
183 }
184
185 pub fn static_bytes(&self) -> Option<&[u8]> {
189 match &self.0 {
190 Static { content, .. } => Some(content.as_ref()),
191 _ => None,
192 }
193 }
194
195 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
199 match self.0 {
200 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
201 Static { content, .. } => Box::pin(Cursor::new(content)),
202 Empty => Box::pin(Cursor::new("")),
203 }
204 }
205
206 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
215 match self.0 {
216 Static { content, .. } => Ok(content.into_cow()),
217
218 Streaming {
219 async_read,
220 len,
221 progress: 0,
222 done: false,
223 ..
224 } => {
225 let mut async_read = async_read.into_inner();
226 let mut buf = len
227 .and_then(|c| c.try_into().ok())
228 .map(Vec::with_capacity)
229 .unwrap_or_default();
230
231 async_read.read_to_end(&mut buf).await?;
232
233 Ok(Cow::Owned(buf))
234 }
235
236 Empty => Ok(Cow::Borrowed(b"")),
237
238 Streaming { .. } => Err(Error::other("body already read to completion")),
239 }
240 }
241
242 pub fn bytes_read(&self) -> u64 {
245 self.0.bytes_read()
246 }
247
248 pub fn len(&self) -> Option<u64> {
251 self.0.len()
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.0.is_empty()
257 }
258
259 pub fn is_static(&self) -> bool {
261 matches!(self.0, Static { .. })
262 }
263
264 pub fn is_streaming(&self) -> bool {
266 matches!(self.0, Streaming { .. })
267 }
268
269 #[doc(hidden)]
276 #[cfg(feature = "unstable")]
277 pub fn try_clone(&self) -> Option<Self> {
278 match &self.0 {
279 Empty => Some(Self::default()),
280 Static { content, .. } => Some(Self(Static {
281 content: content.clone(),
282 cursor: 0,
283 })),
284 Streaming { .. } => None,
285 }
286 }
287
288 #[cfg(feature = "unstable")]
290 pub fn into_h3(self) -> H3Body {
291 H3Body::new(self)
292 }
293
294 #[cfg(not(feature = "unstable"))]
296 pub(crate) fn into_h3(self) -> H3Body {
297 H3Body::new(self)
298 }
299
300 pub(crate) fn into_h2(self) -> H2Body {
307 H2Body::new(self)
308 }
309}
310
311#[allow(
312 clippy::cast_sign_loss,
313 clippy::cast_possible_truncation,
314 clippy::cast_precision_loss,
315 reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
316 the subtraction always yields a non-negative usize-representable value"
317)]
318fn max_bytes_to_read(buf_len: usize) -> usize {
319 assert!(
320 buf_len >= 6,
321 "buffers of length {buf_len} are too small for this implementation.
322 if this is a problem for you, please open an issue"
323 );
324
325 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
326 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
328 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
329}
330
331impl AsyncRead for Body {
332 fn poll_read(
333 mut self: Pin<&mut Self>,
334 cx: &mut Context<'_>,
335 buf: &mut [u8],
336 ) -> Poll<Result<usize>> {
337 match &mut self.0 {
338 Empty => Poll::Ready(Ok(0)),
339 Static { content, cursor } => {
340 let length = content.len();
341 if length == *cursor {
342 return Poll::Ready(Ok(0));
343 }
344 let bytes = (length - *cursor).min(buf.len());
345 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
346 *cursor += bytes;
347 Poll::Ready(Ok(bytes))
348 }
349
350 Streaming {
351 async_read,
352 len: Some(len),
353 done,
354 progress,
355 ..
356 } => {
357 if *done {
358 return Poll::Ready(Ok(0));
359 }
360
361 let max_bytes_to_read = (*len - *progress)
362 .try_into()
363 .unwrap_or(buf.len())
364 .min(buf.len());
365
366 let bytes = ready!(
367 async_read
368 .get_mut()
369 .as_mut()
370 .poll_read(cx, &mut buf[..max_bytes_to_read])
371 )?;
372
373 if bytes == 0 {
374 *done = true;
375 } else {
376 *progress += bytes as u64;
377 }
378
379 Poll::Ready(Ok(bytes))
380 }
381
382 Streaming {
383 async_read,
384 len: None,
385 done,
386 progress,
387 chunked_framing,
388 keep_open,
389 } => {
390 if *done {
391 return Poll::Ready(Ok(0));
392 }
393
394 if !*chunked_framing {
395 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
396 if bytes == 0 {
397 *done = true;
398 } else {
399 *progress += bytes as u64;
400 }
401 return Poll::Ready(Ok(bytes));
402 }
403
404 let max_bytes_to_read = max_bytes_to_read(buf.len());
405
406 let bytes = ready!(
407 async_read
408 .get_mut()
409 .as_mut()
410 .poll_read(cx, &mut buf[..max_bytes_to_read])
411 )?;
412
413 if bytes == 0 {
414 *done = true;
415 if *keep_open {
416 return Poll::Ready(Ok(0));
419 }
420 buf[..3].copy_from_slice(b"0\r\n");
426 return Poll::Ready(Ok(3));
427 }
428
429 *progress += bytes as u64;
430
431 let start = format!("{bytes:X}\r\n");
432 let start_length = start.len();
433 let total = bytes + start_length + 2;
434 buf.copy_within(..bytes, start_length);
435 buf[..start_length].copy_from_slice(start.as_bytes());
436 buf[total - 2..total].copy_from_slice(b"\r\n");
437 Poll::Ready(Ok(total))
438 }
439 }
440 }
441}
442
443struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
444impl Debug for SyncAsyncReader {
445 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
446 f.debug_struct("SyncAsyncReader").finish()
447 }
448}
449impl AsyncRead for SyncAsyncReader {
450 fn poll_read(
451 self: Pin<&mut Self>,
452 cx: &mut Context<'_>,
453 buf: &mut [u8],
454 ) -> Poll<Result<usize>> {
455 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
456 }
457}
458
459#[derive(Clone)]
462pub(crate) enum StaticContent {
463 Cow(Cow<'static, [u8]>),
464 Bytes(Arc<[u8]>),
465 Str(Arc<str>),
466}
467
468impl std::ops::Deref for StaticContent {
469 type Target = [u8];
470
471 fn deref(&self) -> &[u8] {
472 match self {
473 StaticContent::Cow(content) => content,
474 StaticContent::Bytes(content) => content,
475 StaticContent::Str(content) => content.as_bytes(),
476 }
477 }
478}
479
480impl AsRef<[u8]> for StaticContent {
481 fn as_ref(&self) -> &[u8] {
482 self
483 }
484}
485
486impl StaticContent {
487 fn into_cow(self) -> Cow<'static, [u8]> {
490 match self {
491 StaticContent::Cow(content) => content,
492 other => Cow::Owned(other.to_vec()),
493 }
494 }
495}
496
497#[derive(Default)]
498pub(crate) enum BodyType {
499 #[default]
500 Empty,
501
502 Static {
503 content: StaticContent,
504 cursor: usize,
505 },
506
507 Streaming {
508 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
509 progress: u64,
510 len: Option<u64>,
511 done: bool,
512 chunked_framing: bool,
516 keep_open: bool,
520 },
521}
522
523impl Debug for BodyType {
524 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525 match self {
526 Empty => f.debug_tuple("BodyType::Empty").finish(),
527 Static { content, cursor } => f
528 .debug_struct("BodyType::Static")
529 .field("content", &String::from_utf8_lossy(content))
530 .field("cursor", cursor)
531 .finish(),
532 Streaming {
533 len,
534 done,
535 progress,
536 ..
537 } => f
538 .debug_struct("BodyType::Streaming")
539 .field("async_read", &format_args!(".."))
540 .field("len", &len)
541 .field("done", &done)
542 .field("progress", &progress)
543 .finish(),
544 }
545 }
546}
547
548impl BodyType {
549 fn is_empty(&self) -> bool {
550 match *self {
551 Empty => true,
552 Static { ref content, .. } => content.is_empty(),
553 Streaming { len, .. } => len == Some(0),
554 }
555 }
556
557 fn len(&self) -> Option<u64> {
558 match *self {
559 Empty => Some(0),
560 Static { ref content, .. } => Some(content.len() as u64),
561 Streaming { len, .. } => len,
562 }
563 }
564
565 fn bytes_read(&self) -> u64 {
566 match *self {
567 Empty => 0,
568 Static { cursor, .. } => cursor as u64,
569 Streaming { progress, .. } => progress,
570 }
571 }
572}
573
574impl From<String> for Body {
575 fn from(s: String) -> Self {
576 s.into_bytes().into()
577 }
578}
579
580impl From<&'static str> for Body {
581 fn from(s: &'static str) -> Self {
582 s.as_bytes().into()
583 }
584}
585
586impl From<&'static [u8]> for Body {
587 fn from(content: &'static [u8]) -> Self {
588 Self::new_static(content)
589 }
590}
591
592impl From<Vec<u8>> for Body {
593 fn from(content: Vec<u8>) -> Self {
594 Self::new_static(content)
595 }
596}
597
598impl From<Cow<'static, [u8]>> for Body {
599 fn from(value: Cow<'static, [u8]>) -> Self {
600 Self::new_static(value)
601 }
602}
603
604impl From<Cow<'static, str>> for Body {
605 fn from(value: Cow<'static, str>) -> Self {
606 match value {
607 Cow::Borrowed(b) => b.into(),
608 Cow::Owned(o) => o.into(),
609 }
610 }
611}
612
613impl From<Arc<[u8]>> for Body {
614 fn from(content: Arc<[u8]>) -> Self {
615 Self(Static {
616 content: StaticContent::Bytes(content),
617 cursor: 0,
618 })
619 }
620}
621
622impl From<Arc<str>> for Body {
623 fn from(content: Arc<str>) -> Self {
624 Self(Static {
625 content: StaticContent::Str(content),
626 cursor: 0,
627 })
628 }
629}
630
631#[cfg(test)]
632mod test_shared_content {
633 use super::Body;
634 use futures_lite::future::block_on;
635 use std::sync::Arc;
636
637 #[test]
638 fn arc_bytes_roundtrips() {
639 let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
640 let body = Body::from(Arc::clone(&arc));
641 assert_eq!(body.len(), Some(12));
642 assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
643 assert_eq!(
644 block_on(body.into_bytes()).unwrap().as_ref(),
645 b"shared bytes"
646 );
647 assert_eq!(&*arc, b"shared bytes");
649 }
650
651 #[test]
652 fn arc_str_roundtrips() {
653 let arc: Arc<str> = Arc::from("shared str");
654 let body = Body::from(arc);
655 assert_eq!(body.len(), Some(10));
656 assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
657 assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
658 }
659
660 #[cfg(feature = "unstable")]
661 #[test]
662 fn shared_body_clones_without_copying_the_arc() {
663 let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
664 let body = Body::from(Arc::clone(&arc));
665 let clone = body.try_clone().expect("static bodies clone");
666 assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
667 assert_eq!(Arc::strong_count(&arc), 3);
669 }
670}
671
672#[cfg(test)]
673mod test_bytes_to_read {
674 #[test]
675 fn simple_check_of_known_values() {
676 let values = vec![
685 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
701
702 for (input, expected) in values {
703 let actual = super::max_bytes_to_read(input);
704 assert_eq!(
705 actual, expected,
706 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
707 );
708
709 let used_bytes = expected + 4 + format!("{expected:X}").len();
711 assert!(
712 used_bytes == input || used_bytes == input - 1,
713 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
714 input,
715 input,
716 input - 1,
717 used_bytes
718 );
719 }
720 }
721}