Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15/// Streaming body source that can optionally produce trailers.
16///
17/// Implement this on types that compute trailer headers dynamically as the body
18/// is read — for example, a hashing wrapper that produces a `Digest` trailer
19/// after all bytes have been streamed. For plain [`AsyncRead`] sources with no
20/// trailers, [`Body::new_streaming`] is simpler.
21pub trait BodySource: AsyncRead + Send + 'static {
22    /// Returns the trailers for this body, called after the body has been fully read.
23    ///
24    /// Implementations may clear internal state on this call; the result is
25    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
26    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30    struct PlainBody<T> {
31        #[pin]
32        async_read: T,
33    }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37    fn poll_read(
38        self: Pin<&mut Self>,
39        cx: &mut Context<'_>,
40        buf: &mut [u8],
41    ) -> Poll<Result<usize>> {
42        self.project().async_read.poll_read(cx, buf)
43    }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48        None
49    }
50}
51
52/// The trillium representation of a http body. This can contain
53/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
54/// [`AsyncRead`]/[`BodySource`] type.
55#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59    /// Construct a new body from a streaming [`AsyncRead`] source. If
60    /// you have the body content in memory already, prefer
61    /// [`Body::new_static`] or one of the From conversions.
62    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63        Self::new_with_trailers(PlainBody { async_read }, len)
64    }
65
66    /// Construct a new body from a [`BodySource`] that can produce trailers after
67    /// the body has been fully read.
68    ///
69    /// Use this when trailers must be computed dynamically from the body bytes,
70    /// for example to append a content hash.
71    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72        Self(Streaming {
73            async_read: SyncWrapper::new(Box::pin(body)),
74            len,
75            done: false,
76            progress: 0,
77            chunked_framing: true,
78            keep_open: false,
79        })
80    }
81
82    /// Disable chunked-encoding framing emitted by [`AsyncRead`] for streaming bodies
83    /// of unknown length.
84    ///
85    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
86    /// implementation emits chunked framing so the h1 codec can write its bytes directly.
87    /// That framing is wrong for any consumer that wants raw body bytes.
88    #[doc(hidden)]
89    #[cfg(feature = "unstable")]
90    #[must_use]
91    pub fn without_chunked_framing(mut self) -> Self {
92        if let Streaming {
93            ref mut chunked_framing,
94            ..
95        } = self.0
96        {
97            *chunked_framing = false;
98        }
99        self
100    }
101
102    /// Normalize this body to an open, chunk-framed stream: its content goes out as
103    /// chunked-transfer chunks with **no** terminating `0\r\n`, leaving the outbound
104    /// stream open for a following upgrade to continue and eventually close.
105    ///
106    /// Fixed-length content (`Static`, or a streaming body with a known length) is
107    /// re-sourced through the chunked path so it too flows as ordinary chunks rather than
108    /// raw bytes — the caller doesn't have to hand-wrap it in a length-less streaming body.
109    /// An empty body stays empty (it contributes no bytes; the upgrade owns the whole
110    /// stream).
111    ///
112    /// The send site that consumes the body is responsible for *not* writing the
113    /// trailer-section terminator either; trailers (if any) ride onto the upgrade.
114    #[doc(hidden)]
115    #[cfg(feature = "unstable")]
116    #[must_use]
117    pub fn keep_open(mut self) -> Self {
118        self.set_keep_open();
119        self
120    }
121
122    /// In-crate counterpart to [`keep_open`](Self::keep_open) — the server send path sets
123    /// this from `should_upgrade()` and can't reach the `unstable`-gated public builder.
124    pub(crate) fn set_keep_open(&mut self) {
125        // Re-source fixed content through the chunked streaming path so it goes out as
126        // chunks instead of raw bytes. Streaming bodies are left in place (preserving their
127        // `BodySource`, hence any trailers) and just have their framing flags flipped below.
128        if matches!(self.0, Static { .. }) {
129            let reader = std::mem::take(self).into_reader();
130            *self = Self::new_streaming(reader, None);
131        }
132
133        if let Streaming {
134            ref mut len,
135            ref mut chunked_framing,
136            ref mut keep_open,
137            ..
138        } = self.0
139        {
140            *len = None;
141            *chunked_framing = true;
142            *keep_open = true;
143        }
144    }
145
146    pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
147        if let Streaming {
148            ref mut chunked_framing,
149            ..
150        } = self.0
151        {
152            *chunked_framing = true;
153        }
154
155        self
156    }
157
158    /// Returns trailers from the body source, if any.
159    ///
160    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
161    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
162    /// [`Body::new_streaming`] or [`Body::new_static`].
163    #[doc(hidden)]
164    pub fn trailers(&mut self) -> Option<Headers> {
165        match &mut self.0 {
166            Streaming {
167                async_read, done, ..
168            } if *done => async_read.get_mut().as_mut().trailers(),
169            _ => None,
170        }
171    }
172
173    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
174    /// [u8]`.
175    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
176        Self(Static {
177            content: StaticContent::Cow(content.into()),
178            cursor: 0,
179        })
180    }
181
182    /// Retrieve a borrow of the static content in this body. If this
183    /// body is a streaming body or an empty body, this will return
184    /// None.
185    pub fn static_bytes(&self) -> Option<&[u8]> {
186        match &self.0 {
187            Static { content, .. } => Some(content.as_ref()),
188            _ => None,
189        }
190    }
191
192    /// Transform this Body into a dyn [`AsyncRead`], wrapping static content in
193    /// a [`Cursor`]. Unlike reading from the Body directly, this does not apply
194    /// chunked encoding.
195    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
196        match self.0 {
197            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
198            Static { content, .. } => Box::pin(Cursor::new(content)),
199            Empty => Box::pin(Cursor::new("")),
200        }
201    }
202
203    /// Consume this body and return the full content. If the body was constructed
204    /// with [`Body::new_streaming`], this will read the entire streaming body into
205    /// memory, awaiting the streaming source's completion.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the underlying transport errors, or if a streaming body
210    /// has already been partially or fully read.
211    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
212        match self.0 {
213            Static { content, .. } => Ok(content.into_cow()),
214
215            Streaming {
216                async_read,
217                len,
218                progress: 0,
219                done: false,
220                ..
221            } => {
222                let mut async_read = async_read.into_inner();
223                let mut buf = len
224                    .and_then(|c| c.try_into().ok())
225                    .map(Vec::with_capacity)
226                    .unwrap_or_default();
227
228                async_read.read_to_end(&mut buf).await?;
229
230                Ok(Cow::Owned(buf))
231            }
232
233            Empty => Ok(Cow::Borrowed(b"")),
234
235            Streaming { .. } => Err(Error::other("body already read to completion")),
236        }
237    }
238
239    /// Retrieve the number of bytes that have been read from this
240    /// body
241    pub fn bytes_read(&self) -> u64 {
242        self.0.bytes_read()
243    }
244
245    /// returns the content length of this body, if known and
246    /// available.
247    pub fn len(&self) -> Option<u64> {
248        self.0.len()
249    }
250
251    /// determine if the this body represents no data
252    pub fn is_empty(&self) -> bool {
253        self.0.is_empty()
254    }
255
256    /// determine if the this body represents static content
257    pub fn is_static(&self) -> bool {
258        matches!(self.0, Static { .. })
259    }
260
261    /// determine if the this body represents streaming content
262    pub fn is_streaming(&self) -> bool {
263        matches!(self.0, Streaming { .. })
264    }
265
266    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
267    ///
268    /// Static bodies clone cheaply — a `Cow` clone, which is a pointer copy for borrowed
269    /// `&'static` content and a `Vec` clone for owned content. The clone resets read
270    /// progress, so it can be sent again from the beginning. Empty bodies always clone
271    /// successfully.
272    #[doc(hidden)]
273    #[cfg(feature = "unstable")]
274    pub fn try_clone(&self) -> Option<Self> {
275        match &self.0 {
276            Empty => Some(Self::default()),
277            Static { content, .. } => Some(Self(Static {
278                content: content.clone(),
279                cursor: 0,
280            })),
281            Streaming { .. } => None,
282        }
283    }
284
285    /// Convert this body into an `H3Body` for reading
286    #[cfg(feature = "unstable")]
287    pub fn into_h3(self) -> H3Body {
288        H3Body::new(self)
289    }
290
291    /// Convert this body into an `H3Body` for reading
292    #[cfg(not(feature = "unstable"))]
293    pub(crate) fn into_h3(self) -> H3Body {
294        H3Body::new(self)
295    }
296
297    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
298    ///
299    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
300    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
301    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
302    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
303    pub(crate) fn into_h2(self) -> H2Body {
304        H2Body::new(self)
305    }
306}
307
308#[allow(
309    clippy::cast_sign_loss,
310    clippy::cast_possible_truncation,
311    clippy::cast_precision_loss,
312    reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
313              the subtraction always yields a non-negative usize-representable value"
314)]
315fn max_bytes_to_read(buf_len: usize) -> usize {
316    assert!(
317        buf_len >= 6,
318        "buffers of length {buf_len} are too small for this implementation.
319            if this is a problem for you, please open an issue"
320    );
321
322    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
323    // maximum number of bytes the hex representation of the remaining bytes might take
324    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
325    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
326}
327
328impl AsyncRead for Body {
329    fn poll_read(
330        mut self: Pin<&mut Self>,
331        cx: &mut Context<'_>,
332        buf: &mut [u8],
333    ) -> Poll<Result<usize>> {
334        match &mut self.0 {
335            Empty => Poll::Ready(Ok(0)),
336            Static { content, cursor } => {
337                let length = content.len();
338                if length == *cursor {
339                    return Poll::Ready(Ok(0));
340                }
341                let bytes = (length - *cursor).min(buf.len());
342                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
343                *cursor += bytes;
344                Poll::Ready(Ok(bytes))
345            }
346
347            Streaming {
348                async_read,
349                len: Some(len),
350                done,
351                progress,
352                ..
353            } => {
354                if *done {
355                    return Poll::Ready(Ok(0));
356                }
357
358                let max_bytes_to_read = (*len - *progress)
359                    .try_into()
360                    .unwrap_or(buf.len())
361                    .min(buf.len());
362
363                let bytes = ready!(
364                    async_read
365                        .get_mut()
366                        .as_mut()
367                        .poll_read(cx, &mut buf[..max_bytes_to_read])
368                )?;
369
370                if bytes == 0 {
371                    *done = true;
372                } else {
373                    *progress += bytes as u64;
374                }
375
376                Poll::Ready(Ok(bytes))
377            }
378
379            Streaming {
380                async_read,
381                len: None,
382                done,
383                progress,
384                chunked_framing,
385                keep_open,
386            } => {
387                if *done {
388                    return Poll::Ready(Ok(0));
389                }
390
391                if !*chunked_framing {
392                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
393                    if bytes == 0 {
394                        *done = true;
395                    } else {
396                        *progress += bytes as u64;
397                    }
398                    return Poll::Ready(Ok(bytes));
399                }
400
401                let max_bytes_to_read = max_bytes_to_read(buf.len());
402
403                let bytes = ready!(
404                    async_read
405                        .get_mut()
406                        .as_mut()
407                        .poll_read(cx, &mut buf[..max_bytes_to_read])
408                )?;
409
410                if bytes == 0 {
411                    *done = true;
412                    if *keep_open {
413                        // The outbound stream continues into an upgrade; the upgrade owns
414                        // the terminator. Emit no last-chunk marker.
415                        return Poll::Ready(Ok(0));
416                    }
417                    // Last-chunk marker only; the caller emits the trailer-section
418                    // (possibly empty) followed by the terminating `\r\n`. Trailers come
419                    // from `BodySource::trailers()` as structured `Headers`, not bytes,
420                    // and the caller writes them in one shot so this path doesn't need
421                    // a multi-poll state machine spanning buffers.
422                    buf[..3].copy_from_slice(b"0\r\n");
423                    return Poll::Ready(Ok(3));
424                }
425
426                *progress += bytes as u64;
427
428                let start = format!("{bytes:X}\r\n");
429                let start_length = start.len();
430                let total = bytes + start_length + 2;
431                buf.copy_within(..bytes, start_length);
432                buf[..start_length].copy_from_slice(start.as_bytes());
433                buf[total - 2..total].copy_from_slice(b"\r\n");
434                Poll::Ready(Ok(total))
435            }
436        }
437    }
438}
439
440struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
441impl Debug for SyncAsyncReader {
442    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
443        f.debug_struct("SyncAsyncReader").finish()
444    }
445}
446impl AsyncRead for SyncAsyncReader {
447    fn poll_read(
448        self: Pin<&mut Self>,
449        cx: &mut Context<'_>,
450        buf: &mut [u8],
451    ) -> Poll<Result<usize>> {
452        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
453    }
454}
455
456/// In-memory fixed-length body content. Each variant is cheap to clone: borrowed and
457/// shared variants copy a pointer, and the owned `Cow` variant clones its `Vec`.
458#[derive(Clone)]
459pub(crate) enum StaticContent {
460    Cow(Cow<'static, [u8]>),
461    Bytes(Arc<[u8]>),
462    Str(Arc<str>),
463}
464
465impl std::ops::Deref for StaticContent {
466    type Target = [u8];
467
468    fn deref(&self) -> &[u8] {
469        match self {
470            StaticContent::Cow(content) => content,
471            StaticContent::Bytes(content) => content,
472            StaticContent::Str(content) => content.as_bytes(),
473        }
474    }
475}
476
477impl AsRef<[u8]> for StaticContent {
478    fn as_ref(&self) -> &[u8] {
479        self
480    }
481}
482
483impl StaticContent {
484    /// Materialize as an owned `Cow`. The `Cow` variant passes through without copying;
485    /// the shared variants copy their bytes into a `Vec`.
486    fn into_cow(self) -> Cow<'static, [u8]> {
487        match self {
488            StaticContent::Cow(content) => content,
489            other => Cow::Owned(other.to_vec()),
490        }
491    }
492}
493
494#[derive(Default)]
495pub(crate) enum BodyType {
496    #[default]
497    Empty,
498
499    Static {
500        content: StaticContent,
501        cursor: usize,
502    },
503
504    Streaming {
505        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
506        progress: u64,
507        len: Option<u64>,
508        done: bool,
509        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits chunked
510        /// framing for the `len: None` case; when false (via
511        /// [`Body::without_chunked_framing`]), it passes through raw bytes.
512        chunked_framing: bool,
513        /// When true (via [`Body::keep_open`]), the chunked `len: None` read does not
514        /// emit the `0\r\n` last-chunk marker at EOF — the outbound stream is left open
515        /// for a following upgrade to terminate. Default false.
516        keep_open: bool,
517    },
518}
519
520impl Debug for BodyType {
521    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
522        match self {
523            Empty => f.debug_tuple("BodyType::Empty").finish(),
524            Static { content, cursor } => f
525                .debug_struct("BodyType::Static")
526                .field("content", &String::from_utf8_lossy(content))
527                .field("cursor", cursor)
528                .finish(),
529            Streaming {
530                len,
531                done,
532                progress,
533                ..
534            } => f
535                .debug_struct("BodyType::Streaming")
536                .field("async_read", &format_args!(".."))
537                .field("len", &len)
538                .field("done", &done)
539                .field("progress", &progress)
540                .finish(),
541        }
542    }
543}
544
545impl BodyType {
546    fn is_empty(&self) -> bool {
547        match *self {
548            Empty => true,
549            Static { ref content, .. } => content.is_empty(),
550            Streaming { len, .. } => len == Some(0),
551        }
552    }
553
554    fn len(&self) -> Option<u64> {
555        match *self {
556            Empty => Some(0),
557            Static { ref content, .. } => Some(content.len() as u64),
558            Streaming { len, .. } => len,
559        }
560    }
561
562    fn bytes_read(&self) -> u64 {
563        match *self {
564            Empty => 0,
565            Static { cursor, .. } => cursor as u64,
566            Streaming { progress, .. } => progress,
567        }
568    }
569}
570
571impl From<String> for Body {
572    fn from(s: String) -> Self {
573        s.into_bytes().into()
574    }
575}
576
577impl From<&'static str> for Body {
578    fn from(s: &'static str) -> Self {
579        s.as_bytes().into()
580    }
581}
582
583impl From<&'static [u8]> for Body {
584    fn from(content: &'static [u8]) -> Self {
585        Self::new_static(content)
586    }
587}
588
589impl From<Vec<u8>> for Body {
590    fn from(content: Vec<u8>) -> Self {
591        Self::new_static(content)
592    }
593}
594
595impl From<Cow<'static, [u8]>> for Body {
596    fn from(value: Cow<'static, [u8]>) -> Self {
597        Self::new_static(value)
598    }
599}
600
601impl From<Cow<'static, str>> for Body {
602    fn from(value: Cow<'static, str>) -> Self {
603        match value {
604            Cow::Borrowed(b) => b.into(),
605            Cow::Owned(o) => o.into(),
606        }
607    }
608}
609
610impl From<Arc<[u8]>> for Body {
611    fn from(content: Arc<[u8]>) -> Self {
612        Self(Static {
613            content: StaticContent::Bytes(content),
614            cursor: 0,
615        })
616    }
617}
618
619impl From<Arc<str>> for Body {
620    fn from(content: Arc<str>) -> Self {
621        Self(Static {
622            content: StaticContent::Str(content),
623            cursor: 0,
624        })
625    }
626}
627
628#[cfg(test)]
629mod test_shared_content {
630    use super::Body;
631    use futures_lite::future::block_on;
632    use std::sync::Arc;
633
634    #[test]
635    fn arc_bytes_roundtrips() {
636        let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
637        let body = Body::from(Arc::clone(&arc));
638        assert_eq!(body.len(), Some(12));
639        assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
640        assert_eq!(
641            block_on(body.into_bytes()).unwrap().as_ref(),
642            b"shared bytes"
643        );
644        // the source Arc is still usable — the body shared, not consumed, the buffer
645        assert_eq!(&*arc, b"shared bytes");
646    }
647
648    #[test]
649    fn arc_str_roundtrips() {
650        let arc: Arc<str> = Arc::from("shared str");
651        let body = Body::from(arc);
652        assert_eq!(body.len(), Some(10));
653        assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
654        assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
655    }
656
657    #[cfg(feature = "unstable")]
658    #[test]
659    fn shared_body_clones_without_copying_the_arc() {
660        let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
661        let body = Body::from(Arc::clone(&arc));
662        let clone = body.try_clone().expect("static bodies clone");
663        assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
664        // original + body + clone all reference the same allocation
665        assert_eq!(Arc::strong_count(&arc), 3);
666    }
667}
668
669#[cfg(test)]
670mod test_bytes_to_read {
671    #[test]
672    fn simple_check_of_known_values() {
673        // the marked rows are the most important part of this test,
674        // and a nonobvious but intentional consequence of the
675        // implementation. in order to avoid overflowing, we must use
676        // one fewer than the available buffer bytes because
677        // increasing the read size increase the number of framed
678        // bytes by two. This occurs when the hex representation of
679        // the content bytes is near an increase in order of magnitude
680        // (F->10, FF->100, FFF-> 1000, etc)
681        let values = vec![
682            (6, 1),       // 1
683            (7, 2),       // 2
684            (20, 15),     // F
685            (21, 15),     // F <-
686            (22, 16),     // 10
687            (23, 17),     // 11
688            (260, 254),   // FE
689            (261, 254),   // FE <-
690            (262, 255),   // FF <-
691            (263, 256),   // 100
692            (4100, 4093), // FFD
693            (4101, 4093), // FFD <-
694            (4102, 4094), // FFE <-
695            (4103, 4095), // FFF <-
696            (4104, 4096), // 1000
697        ];
698
699        for (input, expected) in values {
700            let actual = super::max_bytes_to_read(input);
701            assert_eq!(
702                actual, expected,
703                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
704            );
705
706            // testing the test:
707            let used_bytes = expected + 4 + format!("{expected:X}").len();
708            assert!(
709                used_bytes == input || used_bytes == input - 1,
710                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
711                input,
712                input,
713                input - 1,
714                used_bytes
715            );
716        }
717    }
718}