Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15/// Streaming body source that can optionally produce trailers.
16///
17/// Implement this on types that compute trailer headers dynamically as the body
18/// is read — for example, a hashing wrapper that produces a `Digest` trailer
19/// after all bytes have been streamed. For plain [`AsyncRead`] sources with no
20/// trailers, [`Body::new_streaming`] is simpler.
21pub trait BodySource: AsyncRead + Send + 'static {
22    /// Returns the trailers for this body, called after the body has been fully read.
23    ///
24    /// Implementations may clear internal state on this call; the result is
25    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
26    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30    struct PlainBody<T> {
31        #[pin]
32        async_read: T,
33    }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37    fn poll_read(
38        self: Pin<&mut Self>,
39        cx: &mut Context<'_>,
40        buf: &mut [u8],
41    ) -> Poll<Result<usize>> {
42        self.project().async_read.poll_read(cx, buf)
43    }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48        None
49    }
50}
51
52/// The trillium representation of a http body. This can contain
53/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
54/// [`AsyncRead`]/[`BodySource`] type.
55#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59    /// Construct a new body from a streaming [`AsyncRead`] source. If
60    /// you have the body content in memory already, prefer
61    /// [`Body::new_static`] or one of the From conversions.
62    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63        Self::new_with_trailers(PlainBody { async_read }, len)
64    }
65
66    /// Construct a new body from a [`BodySource`] that can produce trailers after
67    /// the body has been fully read.
68    ///
69    /// Use this when trailers must be computed dynamically from the body bytes,
70    /// for example to append a content hash.
71    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72        Self(Streaming {
73            async_read: SyncWrapper::new(Box::pin(body)),
74            len,
75            done: false,
76            progress: 0,
77            chunked_framing: true,
78            keep_open: false,
79        })
80    }
81
82    /// Disable chunked-encoding framing emitted by [`AsyncRead`] for streaming bodies
83    /// of unknown length.
84    ///
85    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
86    /// implementation emits chunked framing so the h1 codec can write its bytes directly.
87    /// That framing is wrong for any consumer that wants raw body bytes.
88    #[doc(hidden)]
89    #[cfg(feature = "unstable")]
90    #[must_use]
91    pub fn without_chunked_framing(mut self) -> Self {
92        if let Streaming {
93            ref mut chunked_framing,
94            ..
95        } = self.0
96        {
97            *chunked_framing = false;
98        }
99        self
100    }
101
102    /// Normalize this body to an open, chunk-framed stream: its content goes out as
103    /// chunked-transfer chunks with **no** terminating `0\r\n`, leaving the outbound
104    /// stream open for a following upgrade to continue and eventually close.
105    ///
106    /// Fixed-length content (`Static`, or a streaming body with a known length) is
107    /// re-sourced through the chunked path so it too flows as ordinary chunks rather than
108    /// raw bytes — the caller doesn't have to hand-wrap it in a length-less streaming body.
109    /// An empty body stays empty (it contributes no bytes; the upgrade owns the whole
110    /// stream).
111    ///
112    /// The send site that consumes the body is responsible for *not* writing the
113    /// trailer-section terminator either; trailers (if any) ride onto the upgrade.
114    #[doc(hidden)]
115    #[cfg(feature = "unstable")]
116    #[must_use]
117    pub fn keep_open(mut self) -> Self {
118        self.set_keep_open();
119        self
120    }
121
122    /// In-crate counterpart to [`keep_open`](Self::keep_open) — the server send path sets
123    /// this from `should_upgrade()` and can't reach the `unstable`-gated public builder.
124    pub(crate) fn set_keep_open(&mut self) {
125        // Re-source fixed content through the chunked streaming path so it goes out as
126        // chunks instead of raw bytes. Streaming bodies are left in place (preserving their
127        // `BodySource`, hence any trailers) and just have their framing flags flipped below.
128        if matches!(self.0, Static { .. }) {
129            let reader = std::mem::take(self).into_reader();
130            *self = Self::new_streaming(reader, None);
131        }
132
133        if let Streaming {
134            ref mut len,
135            ref mut chunked_framing,
136            ref mut keep_open,
137            ..
138        } = self.0
139        {
140            *len = None;
141            *chunked_framing = true;
142            *keep_open = true;
143        }
144    }
145
146    /// Set whether this body's [`AsyncRead`] impl emits chunked framing for the
147    /// `len: None` case. The h1 send path drives this from the finalized response
148    /// headers: `chunked` when `Transfer-Encoding: chunked` is present, raw passthrough
149    /// (close-delimited) when neither it nor `Content-Length` is. Has no effect on
150    /// fixed-length or static bodies, which never chunk.
151    pub(crate) fn set_chunked_framing(&mut self, on: bool) {
152        if let Streaming {
153            ref mut chunked_framing,
154            ..
155        } = self.0
156        {
157            *chunked_framing = on;
158        }
159    }
160
161    /// Returns trailers from the body source, if any.
162    ///
163    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
164    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
165    /// [`Body::new_streaming`] or [`Body::new_static`].
166    #[doc(hidden)]
167    pub fn trailers(&mut self) -> Option<Headers> {
168        match &mut self.0 {
169            Streaming {
170                async_read, done, ..
171            } if *done => async_read.get_mut().as_mut().trailers(),
172            _ => None,
173        }
174    }
175
176    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
177    /// [u8]`.
178    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
179        Self(Static {
180            content: StaticContent::Cow(content.into()),
181            cursor: 0,
182        })
183    }
184
185    /// Retrieve a borrow of the static content in this body. If this
186    /// body is a streaming body or an empty body, this will return
187    /// None.
188    pub fn static_bytes(&self) -> Option<&[u8]> {
189        match &self.0 {
190            Static { content, .. } => Some(content.as_ref()),
191            _ => None,
192        }
193    }
194
195    /// Transform this Body into a dyn [`AsyncRead`], wrapping static content in
196    /// a [`Cursor`]. Unlike reading from the Body directly, this does not apply
197    /// chunked encoding.
198    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
199        match self.0 {
200            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
201            Static { content, .. } => Box::pin(Cursor::new(content)),
202            Empty => Box::pin(Cursor::new("")),
203        }
204    }
205
206    /// Consume this body and return the full content. If the body was constructed
207    /// with [`Body::new_streaming`], this will read the entire streaming body into
208    /// memory, awaiting the streaming source's completion.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the underlying transport errors, or if a streaming body
213    /// has already been partially or fully read.
214    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
215        match self.0 {
216            Static { content, .. } => Ok(content.into_cow()),
217
218            Streaming {
219                async_read,
220                len,
221                progress: 0,
222                done: false,
223                ..
224            } => {
225                let mut async_read = async_read.into_inner();
226                let mut buf = len
227                    .and_then(|c| c.try_into().ok())
228                    .map(Vec::with_capacity)
229                    .unwrap_or_default();
230
231                async_read.read_to_end(&mut buf).await?;
232
233                Ok(Cow::Owned(buf))
234            }
235
236            Empty => Ok(Cow::Borrowed(b"")),
237
238            Streaming { .. } => Err(Error::other("body already read to completion")),
239        }
240    }
241
242    /// Retrieve the number of bytes that have been read from this
243    /// body
244    pub fn bytes_read(&self) -> u64 {
245        self.0.bytes_read()
246    }
247
248    /// returns the content length of this body, if known and
249    /// available.
250    pub fn len(&self) -> Option<u64> {
251        self.0.len()
252    }
253
254    /// determine if the this body represents no data
255    pub fn is_empty(&self) -> bool {
256        self.0.is_empty()
257    }
258
259    /// determine if the this body represents static content
260    pub fn is_static(&self) -> bool {
261        matches!(self.0, Static { .. })
262    }
263
264    /// determine if the this body represents streaming content
265    pub fn is_streaming(&self) -> bool {
266        matches!(self.0, Streaming { .. })
267    }
268
269    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
270    ///
271    /// Static bodies clone cheaply — a `Cow` clone, which is a pointer copy for borrowed
272    /// `&'static` content and a `Vec` clone for owned content. The clone resets read
273    /// progress, so it can be sent again from the beginning. Empty bodies always clone
274    /// successfully.
275    #[doc(hidden)]
276    #[cfg(feature = "unstable")]
277    pub fn try_clone(&self) -> Option<Self> {
278        match &self.0 {
279            Empty => Some(Self::default()),
280            Static { content, .. } => Some(Self(Static {
281                content: content.clone(),
282                cursor: 0,
283            })),
284            Streaming { .. } => None,
285        }
286    }
287
288    /// Convert this body into an `H3Body` for reading
289    #[cfg(feature = "unstable")]
290    pub fn into_h3(self) -> H3Body {
291        H3Body::new(self)
292    }
293
294    /// Convert this body into an `H3Body` for reading
295    #[cfg(not(feature = "unstable"))]
296    pub(crate) fn into_h3(self) -> H3Body {
297        H3Body::new(self)
298    }
299
300    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
301    ///
302    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
303    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
304    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
305    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
306    pub(crate) fn into_h2(self) -> H2Body {
307        H2Body::new(self)
308    }
309}
310
311#[allow(
312    clippy::cast_sign_loss,
313    clippy::cast_possible_truncation,
314    clippy::cast_precision_loss,
315    reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
316              the subtraction always yields a non-negative usize-representable value"
317)]
318fn max_bytes_to_read(buf_len: usize) -> usize {
319    assert!(
320        buf_len >= 6,
321        "buffers of length {buf_len} are too small for this implementation.
322            if this is a problem for you, please open an issue"
323    );
324
325    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
326    // maximum number of bytes the hex representation of the remaining bytes might take
327    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
328    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
329}
330
331impl AsyncRead for Body {
332    fn poll_read(
333        mut self: Pin<&mut Self>,
334        cx: &mut Context<'_>,
335        buf: &mut [u8],
336    ) -> Poll<Result<usize>> {
337        match &mut self.0 {
338            Empty => Poll::Ready(Ok(0)),
339            Static { content, cursor } => {
340                let length = content.len();
341                if length == *cursor {
342                    return Poll::Ready(Ok(0));
343                }
344                let bytes = (length - *cursor).min(buf.len());
345                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
346                *cursor += bytes;
347                Poll::Ready(Ok(bytes))
348            }
349
350            Streaming {
351                async_read,
352                len: Some(len),
353                done,
354                progress,
355                ..
356            } => {
357                if *done {
358                    return Poll::Ready(Ok(0));
359                }
360
361                let max_bytes_to_read = (*len - *progress)
362                    .try_into()
363                    .unwrap_or(buf.len())
364                    .min(buf.len());
365
366                let bytes = ready!(
367                    async_read
368                        .get_mut()
369                        .as_mut()
370                        .poll_read(cx, &mut buf[..max_bytes_to_read])
371                )?;
372
373                if bytes == 0 {
374                    *done = true;
375                } else {
376                    *progress += bytes as u64;
377                }
378
379                Poll::Ready(Ok(bytes))
380            }
381
382            Streaming {
383                async_read,
384                len: None,
385                done,
386                progress,
387                chunked_framing,
388                keep_open,
389            } => {
390                if *done {
391                    return Poll::Ready(Ok(0));
392                }
393
394                if !*chunked_framing {
395                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
396                    if bytes == 0 {
397                        *done = true;
398                    } else {
399                        *progress += bytes as u64;
400                    }
401                    return Poll::Ready(Ok(bytes));
402                }
403
404                let max_bytes_to_read = max_bytes_to_read(buf.len());
405
406                let bytes = ready!(
407                    async_read
408                        .get_mut()
409                        .as_mut()
410                        .poll_read(cx, &mut buf[..max_bytes_to_read])
411                )?;
412
413                if bytes == 0 {
414                    *done = true;
415                    if *keep_open {
416                        // The outbound stream continues into an upgrade; the upgrade owns
417                        // the terminator. Emit no last-chunk marker.
418                        return Poll::Ready(Ok(0));
419                    }
420                    // Last-chunk marker only; the caller emits the trailer-section
421                    // (possibly empty) followed by the terminating `\r\n`. Trailers come
422                    // from `BodySource::trailers()` as structured `Headers`, not bytes,
423                    // and the caller writes them in one shot so this path doesn't need
424                    // a multi-poll state machine spanning buffers.
425                    buf[..3].copy_from_slice(b"0\r\n");
426                    return Poll::Ready(Ok(3));
427                }
428
429                *progress += bytes as u64;
430
431                let start = format!("{bytes:X}\r\n");
432                let start_length = start.len();
433                let total = bytes + start_length + 2;
434                buf.copy_within(..bytes, start_length);
435                buf[..start_length].copy_from_slice(start.as_bytes());
436                buf[total - 2..total].copy_from_slice(b"\r\n");
437                Poll::Ready(Ok(total))
438            }
439        }
440    }
441}
442
443struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
444impl Debug for SyncAsyncReader {
445    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
446        f.debug_struct("SyncAsyncReader").finish()
447    }
448}
449impl AsyncRead for SyncAsyncReader {
450    fn poll_read(
451        self: Pin<&mut Self>,
452        cx: &mut Context<'_>,
453        buf: &mut [u8],
454    ) -> Poll<Result<usize>> {
455        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
456    }
457}
458
459/// In-memory fixed-length body content. Each variant is cheap to clone: borrowed and
460/// shared variants copy a pointer, and the owned `Cow` variant clones its `Vec`.
461#[derive(Clone)]
462pub(crate) enum StaticContent {
463    Cow(Cow<'static, [u8]>),
464    Bytes(Arc<[u8]>),
465    Str(Arc<str>),
466}
467
468impl std::ops::Deref for StaticContent {
469    type Target = [u8];
470
471    fn deref(&self) -> &[u8] {
472        match self {
473            StaticContent::Cow(content) => content,
474            StaticContent::Bytes(content) => content,
475            StaticContent::Str(content) => content.as_bytes(),
476        }
477    }
478}
479
480impl AsRef<[u8]> for StaticContent {
481    fn as_ref(&self) -> &[u8] {
482        self
483    }
484}
485
486impl StaticContent {
487    /// Materialize as an owned `Cow`. The `Cow` variant passes through without copying;
488    /// the shared variants copy their bytes into a `Vec`.
489    fn into_cow(self) -> Cow<'static, [u8]> {
490        match self {
491            StaticContent::Cow(content) => content,
492            other => Cow::Owned(other.to_vec()),
493        }
494    }
495}
496
497#[derive(Default)]
498pub(crate) enum BodyType {
499    #[default]
500    Empty,
501
502    Static {
503        content: StaticContent,
504        cursor: usize,
505    },
506
507    Streaming {
508        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
509        progress: u64,
510        len: Option<u64>,
511        done: bool,
512        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits chunked
513        /// framing for the `len: None` case; when false (via
514        /// [`Body::without_chunked_framing`]), it passes through raw bytes.
515        chunked_framing: bool,
516        /// When true (via [`Body::keep_open`]), the chunked `len: None` read does not
517        /// emit the `0\r\n` last-chunk marker at EOF — the outbound stream is left open
518        /// for a following upgrade to terminate. Default false.
519        keep_open: bool,
520    },
521}
522
523impl Debug for BodyType {
524    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525        match self {
526            Empty => f.debug_tuple("BodyType::Empty").finish(),
527            Static { content, cursor } => f
528                .debug_struct("BodyType::Static")
529                .field("content", &String::from_utf8_lossy(content))
530                .field("cursor", cursor)
531                .finish(),
532            Streaming {
533                len,
534                done,
535                progress,
536                ..
537            } => f
538                .debug_struct("BodyType::Streaming")
539                .field("async_read", &format_args!(".."))
540                .field("len", &len)
541                .field("done", &done)
542                .field("progress", &progress)
543                .finish(),
544        }
545    }
546}
547
548impl BodyType {
549    fn is_empty(&self) -> bool {
550        match *self {
551            Empty => true,
552            Static { ref content, .. } => content.is_empty(),
553            Streaming { len, .. } => len == Some(0),
554        }
555    }
556
557    fn len(&self) -> Option<u64> {
558        match *self {
559            Empty => Some(0),
560            Static { ref content, .. } => Some(content.len() as u64),
561            Streaming { len, .. } => len,
562        }
563    }
564
565    fn bytes_read(&self) -> u64 {
566        match *self {
567            Empty => 0,
568            Static { cursor, .. } => cursor as u64,
569            Streaming { progress, .. } => progress,
570        }
571    }
572}
573
574impl From<String> for Body {
575    fn from(s: String) -> Self {
576        s.into_bytes().into()
577    }
578}
579
580impl From<&'static str> for Body {
581    fn from(s: &'static str) -> Self {
582        s.as_bytes().into()
583    }
584}
585
586impl From<&'static [u8]> for Body {
587    fn from(content: &'static [u8]) -> Self {
588        Self::new_static(content)
589    }
590}
591
592impl From<Vec<u8>> for Body {
593    fn from(content: Vec<u8>) -> Self {
594        Self::new_static(content)
595    }
596}
597
598impl From<Cow<'static, [u8]>> for Body {
599    fn from(value: Cow<'static, [u8]>) -> Self {
600        Self::new_static(value)
601    }
602}
603
604impl From<Cow<'static, str>> for Body {
605    fn from(value: Cow<'static, str>) -> Self {
606        match value {
607            Cow::Borrowed(b) => b.into(),
608            Cow::Owned(o) => o.into(),
609        }
610    }
611}
612
613impl From<Arc<[u8]>> for Body {
614    fn from(content: Arc<[u8]>) -> Self {
615        Self(Static {
616            content: StaticContent::Bytes(content),
617            cursor: 0,
618        })
619    }
620}
621
622impl From<Arc<str>> for Body {
623    fn from(content: Arc<str>) -> Self {
624        Self(Static {
625            content: StaticContent::Str(content),
626            cursor: 0,
627        })
628    }
629}
630
631#[cfg(test)]
632mod test_shared_content {
633    use super::Body;
634    use futures_lite::future::block_on;
635    use std::sync::Arc;
636
637    #[test]
638    fn arc_bytes_roundtrips() {
639        let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
640        let body = Body::from(Arc::clone(&arc));
641        assert_eq!(body.len(), Some(12));
642        assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
643        assert_eq!(
644            block_on(body.into_bytes()).unwrap().as_ref(),
645            b"shared bytes"
646        );
647        // the source Arc is still usable — the body shared, not consumed, the buffer
648        assert_eq!(&*arc, b"shared bytes");
649    }
650
651    #[test]
652    fn arc_str_roundtrips() {
653        let arc: Arc<str> = Arc::from("shared str");
654        let body = Body::from(arc);
655        assert_eq!(body.len(), Some(10));
656        assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
657        assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
658    }
659
660    #[cfg(feature = "unstable")]
661    #[test]
662    fn shared_body_clones_without_copying_the_arc() {
663        let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
664        let body = Body::from(Arc::clone(&arc));
665        let clone = body.try_clone().expect("static bodies clone");
666        assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
667        // original + body + clone all reference the same allocation
668        assert_eq!(Arc::strong_count(&arc), 3);
669    }
670}
671
672#[cfg(test)]
673mod test_bytes_to_read {
674    #[test]
675    fn simple_check_of_known_values() {
676        // the marked rows are the most important part of this test,
677        // and a nonobvious but intentional consequence of the
678        // implementation. in order to avoid overflowing, we must use
679        // one fewer than the available buffer bytes because
680        // increasing the read size increase the number of framed
681        // bytes by two. This occurs when the hex representation of
682        // the content bytes is near an increase in order of magnitude
683        // (F->10, FF->100, FFF-> 1000, etc)
684        let values = vec![
685            (6, 1),       // 1
686            (7, 2),       // 2
687            (20, 15),     // F
688            (21, 15),     // F <-
689            (22, 16),     // 10
690            (23, 17),     // 11
691            (260, 254),   // FE
692            (261, 254),   // FE <-
693            (262, 255),   // FF <-
694            (263, 256),   // 100
695            (4100, 4093), // FFD
696            (4101, 4093), // FFD <-
697            (4102, 4094), // FFE <-
698            (4103, 4095), // FFF <-
699            (4104, 4096), // 1000
700        ];
701
702        for (input, expected) in values {
703            let actual = super::max_bytes_to_read(input);
704            assert_eq!(
705                actual, expected,
706                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
707            );
708
709            // testing the test:
710            let used_bytes = expected + 4 + format!("{expected:X}").len();
711            assert!(
712                used_bytes == input || used_bytes == input - 1,
713                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
714                input,
715                input,
716                input - 1,
717                used_bytes
718            );
719        }
720    }
721}