1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15pub trait BodySource: AsyncRead + Send + 'static {
22 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30 struct PlainBody<T> {
31 #[pin]
32 async_read: T,
33 }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37 fn poll_read(
38 self: Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 buf: &mut [u8],
41 ) -> Poll<Result<usize>> {
42 self.project().async_read.poll_read(cx, buf)
43 }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48 None
49 }
50}
51
52#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63 Self::new_with_trailers(PlainBody { async_read }, len)
64 }
65
66 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72 Self(Streaming {
73 async_read: SyncWrapper::new(Box::pin(body)),
74 len,
75 done: false,
76 progress: 0,
77 chunked_framing: true,
78 keep_open: false,
79 })
80 }
81
82 #[doc(hidden)]
89 #[cfg(feature = "unstable")]
90 #[must_use]
91 pub fn without_chunked_framing(mut self) -> Self {
92 if let Streaming {
93 ref mut chunked_framing,
94 ..
95 } = self.0
96 {
97 *chunked_framing = false;
98 }
99 self
100 }
101
102 #[doc(hidden)]
115 #[cfg(feature = "unstable")]
116 #[must_use]
117 pub fn keep_open(mut self) -> Self {
118 self.set_keep_open();
119 self
120 }
121
122 pub(crate) fn set_keep_open(&mut self) {
125 if matches!(self.0, Static { .. }) {
129 let reader = std::mem::take(self).into_reader();
130 *self = Self::new_streaming(reader, None);
131 }
132
133 if let Streaming {
134 ref mut len,
135 ref mut chunked_framing,
136 ref mut keep_open,
137 ..
138 } = self.0
139 {
140 *len = None;
141 *chunked_framing = true;
142 *keep_open = true;
143 }
144 }
145
146 pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
147 if let Streaming {
148 ref mut chunked_framing,
149 ..
150 } = self.0
151 {
152 *chunked_framing = true;
153 }
154
155 self
156 }
157
158 #[doc(hidden)]
164 pub fn trailers(&mut self) -> Option<Headers> {
165 match &mut self.0 {
166 Streaming {
167 async_read, done, ..
168 } if *done => async_read.get_mut().as_mut().trailers(),
169 _ => None,
170 }
171 }
172
173 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
176 Self(Static {
177 content: StaticContent::Cow(content.into()),
178 cursor: 0,
179 })
180 }
181
182 pub fn static_bytes(&self) -> Option<&[u8]> {
186 match &self.0 {
187 Static { content, .. } => Some(content.as_ref()),
188 _ => None,
189 }
190 }
191
192 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
196 match self.0 {
197 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
198 Static { content, .. } => Box::pin(Cursor::new(content)),
199 Empty => Box::pin(Cursor::new("")),
200 }
201 }
202
203 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
212 match self.0 {
213 Static { content, .. } => Ok(content.into_cow()),
214
215 Streaming {
216 async_read,
217 len,
218 progress: 0,
219 done: false,
220 ..
221 } => {
222 let mut async_read = async_read.into_inner();
223 let mut buf = len
224 .and_then(|c| c.try_into().ok())
225 .map(Vec::with_capacity)
226 .unwrap_or_default();
227
228 async_read.read_to_end(&mut buf).await?;
229
230 Ok(Cow::Owned(buf))
231 }
232
233 Empty => Ok(Cow::Borrowed(b"")),
234
235 Streaming { .. } => Err(Error::other("body already read to completion")),
236 }
237 }
238
239 pub fn bytes_read(&self) -> u64 {
242 self.0.bytes_read()
243 }
244
245 pub fn len(&self) -> Option<u64> {
248 self.0.len()
249 }
250
251 pub fn is_empty(&self) -> bool {
253 self.0.is_empty()
254 }
255
256 pub fn is_static(&self) -> bool {
258 matches!(self.0, Static { .. })
259 }
260
261 pub fn is_streaming(&self) -> bool {
263 matches!(self.0, Streaming { .. })
264 }
265
266 #[doc(hidden)]
273 #[cfg(feature = "unstable")]
274 pub fn try_clone(&self) -> Option<Self> {
275 match &self.0 {
276 Empty => Some(Self::default()),
277 Static { content, .. } => Some(Self(Static {
278 content: content.clone(),
279 cursor: 0,
280 })),
281 Streaming { .. } => None,
282 }
283 }
284
285 #[cfg(feature = "unstable")]
287 pub fn into_h3(self) -> H3Body {
288 H3Body::new(self)
289 }
290
291 #[cfg(not(feature = "unstable"))]
293 pub(crate) fn into_h3(self) -> H3Body {
294 H3Body::new(self)
295 }
296
297 pub(crate) fn into_h2(self) -> H2Body {
304 H2Body::new(self)
305 }
306}
307
308#[allow(
309 clippy::cast_sign_loss,
310 clippy::cast_possible_truncation,
311 clippy::cast_precision_loss,
312 reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
313 the subtraction always yields a non-negative usize-representable value"
314)]
315fn max_bytes_to_read(buf_len: usize) -> usize {
316 assert!(
317 buf_len >= 6,
318 "buffers of length {buf_len} are too small for this implementation.
319 if this is a problem for you, please open an issue"
320 );
321
322 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
323 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
325 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
326}
327
328impl AsyncRead for Body {
329 fn poll_read(
330 mut self: Pin<&mut Self>,
331 cx: &mut Context<'_>,
332 buf: &mut [u8],
333 ) -> Poll<Result<usize>> {
334 match &mut self.0 {
335 Empty => Poll::Ready(Ok(0)),
336 Static { content, cursor } => {
337 let length = content.len();
338 if length == *cursor {
339 return Poll::Ready(Ok(0));
340 }
341 let bytes = (length - *cursor).min(buf.len());
342 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
343 *cursor += bytes;
344 Poll::Ready(Ok(bytes))
345 }
346
347 Streaming {
348 async_read,
349 len: Some(len),
350 done,
351 progress,
352 ..
353 } => {
354 if *done {
355 return Poll::Ready(Ok(0));
356 }
357
358 let max_bytes_to_read = (*len - *progress)
359 .try_into()
360 .unwrap_or(buf.len())
361 .min(buf.len());
362
363 let bytes = ready!(
364 async_read
365 .get_mut()
366 .as_mut()
367 .poll_read(cx, &mut buf[..max_bytes_to_read])
368 )?;
369
370 if bytes == 0 {
371 *done = true;
372 } else {
373 *progress += bytes as u64;
374 }
375
376 Poll::Ready(Ok(bytes))
377 }
378
379 Streaming {
380 async_read,
381 len: None,
382 done,
383 progress,
384 chunked_framing,
385 keep_open,
386 } => {
387 if *done {
388 return Poll::Ready(Ok(0));
389 }
390
391 if !*chunked_framing {
392 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
393 if bytes == 0 {
394 *done = true;
395 } else {
396 *progress += bytes as u64;
397 }
398 return Poll::Ready(Ok(bytes));
399 }
400
401 let max_bytes_to_read = max_bytes_to_read(buf.len());
402
403 let bytes = ready!(
404 async_read
405 .get_mut()
406 .as_mut()
407 .poll_read(cx, &mut buf[..max_bytes_to_read])
408 )?;
409
410 if bytes == 0 {
411 *done = true;
412 if *keep_open {
413 return Poll::Ready(Ok(0));
416 }
417 buf[..3].copy_from_slice(b"0\r\n");
423 return Poll::Ready(Ok(3));
424 }
425
426 *progress += bytes as u64;
427
428 let start = format!("{bytes:X}\r\n");
429 let start_length = start.len();
430 let total = bytes + start_length + 2;
431 buf.copy_within(..bytes, start_length);
432 buf[..start_length].copy_from_slice(start.as_bytes());
433 buf[total - 2..total].copy_from_slice(b"\r\n");
434 Poll::Ready(Ok(total))
435 }
436 }
437 }
438}
439
440struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
441impl Debug for SyncAsyncReader {
442 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
443 f.debug_struct("SyncAsyncReader").finish()
444 }
445}
446impl AsyncRead for SyncAsyncReader {
447 fn poll_read(
448 self: Pin<&mut Self>,
449 cx: &mut Context<'_>,
450 buf: &mut [u8],
451 ) -> Poll<Result<usize>> {
452 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
453 }
454}
455
456#[derive(Clone)]
459pub(crate) enum StaticContent {
460 Cow(Cow<'static, [u8]>),
461 Bytes(Arc<[u8]>),
462 Str(Arc<str>),
463}
464
465impl std::ops::Deref for StaticContent {
466 type Target = [u8];
467
468 fn deref(&self) -> &[u8] {
469 match self {
470 StaticContent::Cow(content) => content,
471 StaticContent::Bytes(content) => content,
472 StaticContent::Str(content) => content.as_bytes(),
473 }
474 }
475}
476
477impl AsRef<[u8]> for StaticContent {
478 fn as_ref(&self) -> &[u8] {
479 self
480 }
481}
482
483impl StaticContent {
484 fn into_cow(self) -> Cow<'static, [u8]> {
487 match self {
488 StaticContent::Cow(content) => content,
489 other => Cow::Owned(other.to_vec()),
490 }
491 }
492}
493
494#[derive(Default)]
495pub(crate) enum BodyType {
496 #[default]
497 Empty,
498
499 Static {
500 content: StaticContent,
501 cursor: usize,
502 },
503
504 Streaming {
505 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
506 progress: u64,
507 len: Option<u64>,
508 done: bool,
509 chunked_framing: bool,
513 keep_open: bool,
517 },
518}
519
520impl Debug for BodyType {
521 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
522 match self {
523 Empty => f.debug_tuple("BodyType::Empty").finish(),
524 Static { content, cursor } => f
525 .debug_struct("BodyType::Static")
526 .field("content", &String::from_utf8_lossy(content))
527 .field("cursor", cursor)
528 .finish(),
529 Streaming {
530 len,
531 done,
532 progress,
533 ..
534 } => f
535 .debug_struct("BodyType::Streaming")
536 .field("async_read", &format_args!(".."))
537 .field("len", &len)
538 .field("done", &done)
539 .field("progress", &progress)
540 .finish(),
541 }
542 }
543}
544
545impl BodyType {
546 fn is_empty(&self) -> bool {
547 match *self {
548 Empty => true,
549 Static { ref content, .. } => content.is_empty(),
550 Streaming { len, .. } => len == Some(0),
551 }
552 }
553
554 fn len(&self) -> Option<u64> {
555 match *self {
556 Empty => Some(0),
557 Static { ref content, .. } => Some(content.len() as u64),
558 Streaming { len, .. } => len,
559 }
560 }
561
562 fn bytes_read(&self) -> u64 {
563 match *self {
564 Empty => 0,
565 Static { cursor, .. } => cursor as u64,
566 Streaming { progress, .. } => progress,
567 }
568 }
569}
570
571impl From<String> for Body {
572 fn from(s: String) -> Self {
573 s.into_bytes().into()
574 }
575}
576
577impl From<&'static str> for Body {
578 fn from(s: &'static str) -> Self {
579 s.as_bytes().into()
580 }
581}
582
583impl From<&'static [u8]> for Body {
584 fn from(content: &'static [u8]) -> Self {
585 Self::new_static(content)
586 }
587}
588
589impl From<Vec<u8>> for Body {
590 fn from(content: Vec<u8>) -> Self {
591 Self::new_static(content)
592 }
593}
594
595impl From<Cow<'static, [u8]>> for Body {
596 fn from(value: Cow<'static, [u8]>) -> Self {
597 Self::new_static(value)
598 }
599}
600
601impl From<Cow<'static, str>> for Body {
602 fn from(value: Cow<'static, str>) -> Self {
603 match value {
604 Cow::Borrowed(b) => b.into(),
605 Cow::Owned(o) => o.into(),
606 }
607 }
608}
609
610impl From<Arc<[u8]>> for Body {
611 fn from(content: Arc<[u8]>) -> Self {
612 Self(Static {
613 content: StaticContent::Bytes(content),
614 cursor: 0,
615 })
616 }
617}
618
619impl From<Arc<str>> for Body {
620 fn from(content: Arc<str>) -> Self {
621 Self(Static {
622 content: StaticContent::Str(content),
623 cursor: 0,
624 })
625 }
626}
627
628#[cfg(test)]
629mod test_shared_content {
630 use super::Body;
631 use futures_lite::future::block_on;
632 use std::sync::Arc;
633
634 #[test]
635 fn arc_bytes_roundtrips() {
636 let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
637 let body = Body::from(Arc::clone(&arc));
638 assert_eq!(body.len(), Some(12));
639 assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
640 assert_eq!(
641 block_on(body.into_bytes()).unwrap().as_ref(),
642 b"shared bytes"
643 );
644 assert_eq!(&*arc, b"shared bytes");
646 }
647
648 #[test]
649 fn arc_str_roundtrips() {
650 let arc: Arc<str> = Arc::from("shared str");
651 let body = Body::from(arc);
652 assert_eq!(body.len(), Some(10));
653 assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
654 assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
655 }
656
657 #[cfg(feature = "unstable")]
658 #[test]
659 fn shared_body_clones_without_copying_the_arc() {
660 let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
661 let body = Body::from(Arc::clone(&arc));
662 let clone = body.try_clone().expect("static bodies clone");
663 assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
664 assert_eq!(Arc::strong_count(&arc), 3);
666 }
667}
668
669#[cfg(test)]
670mod test_bytes_to_read {
671 #[test]
672 fn simple_check_of_known_values() {
673 let values = vec![
682 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
698
699 for (input, expected) in values {
700 let actual = super::max_bytes_to_read(input);
701 assert_eq!(
702 actual, expected,
703 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
704 );
705
706 let used_bytes = expected + 4 + format!("{expected:X}").len();
708 assert!(
709 used_bytes == input || used_bytes == input - 1,
710 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
711 input,
712 input,
713 input - 1,
714 used_bytes
715 );
716 }
717 }
718}