Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Trait for streaming body sources that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed.
19///
20/// For plain [`AsyncRead`] sources with no trailers, use [`Body::new_streaming`].
21/// `BodySource` is only needed when trailers must be produced.
22pub trait BodySource: AsyncRead + Send + 'static {
23    /// Returns the trailers for this body, called after the body has been fully read.
24    ///
25    /// Implementations may clear internal state on this call; the result is
26    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
27    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
28}
29
30pin_project! {
31    struct PlainBody<T> {
32        #[pin]
33        async_read: T,
34    }
35}
36
37impl<T: AsyncRead> AsyncRead for PlainBody<T> {
38    fn poll_read(
39        self: Pin<&mut Self>,
40        cx: &mut Context<'_>,
41        buf: &mut [u8],
42    ) -> Poll<Result<usize>> {
43        self.project().async_read.poll_read(cx, buf)
44    }
45}
46
47impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
48    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
49        None
50    }
51}
52
53/// The trillium representation of a http body. This can contain
54/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
55/// [`AsyncRead`]/[`BodySource`] type.
56#[derive(Debug, Default)]
57pub struct Body(pub(crate) BodyType);
58
59impl Body {
60    /// Construct a new body from a streaming [`AsyncRead`] source. If
61    /// you have the body content in memory already, prefer
62    /// [`Body::new_static`] or one of the From conversions.
63    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
64        Self::new_with_trailers(PlainBody { async_read }, len)
65    }
66
67    /// Construct a new body from a [`BodySource`] that can produce trailers after
68    /// the body has been fully read.
69    ///
70    /// Use this when trailers must be computed dynamically from the body bytes,
71    /// for example to append a content hash.
72    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
73        Self(Streaming {
74            async_read: SyncWrapper::new(Box::pin(body)),
75            len,
76            done: false,
77            progress: 0,
78            chunked_framing: true,
79        })
80    }
81
82    /// Disable RFC 9112 chunked-encoding framing emitted by [`AsyncRead`] for streaming
83    /// bodies of unknown length.
84    ///
85    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
86    /// implementation emits the wire-format chunked framing (chunk-size prefix, terminating
87    /// `0\r\n` marker) so the h1 codec can write its bytes directly. That framing is wrong
88    /// for any consumer that wants raw body bytes — e.g., installing a Body as the override
89    /// source on a client `Conn` for cache replay or middleware tee.
90    ///
91    /// This method is `#[doc(hidden)]` and `unstable`-feature-gated; it exists for internal
92    /// use by trillium-client. External code has no reason to set this flag.
93    #[doc(hidden)]
94    #[cfg(feature = "unstable")]
95    #[must_use]
96    pub fn without_chunked_framing(mut self) -> Self {
97        if let Streaming {
98            ref mut chunked_framing,
99            ..
100        } = self.0
101        {
102            *chunked_framing = false;
103        }
104        self
105    }
106
107    pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
108        if let Streaming {
109            ref mut chunked_framing,
110            ..
111        } = self.0
112        {
113            *chunked_framing = true;
114        }
115
116        self
117    }
118
119    /// Returns trailers from the body source, if any.
120    ///
121    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
122    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
123    /// [`Body::new_streaming`] or [`Body::new_static`].
124    #[doc(hidden)] // this isn't really a user-facing interface
125    pub fn trailers(&mut self) -> Option<Headers> {
126        match &mut self.0 {
127            Streaming {
128                async_read, done, ..
129            } if *done => async_read.get_mut().as_mut().trailers(),
130            _ => None,
131        }
132    }
133
134    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
135    /// [u8]`.
136    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
137        Self(Static {
138            content: content.into(),
139            cursor: 0,
140        })
141    }
142
143    /// Retrieve a borrow of the static content in this body. If this
144    /// body is a streaming body or an empty body, this will return
145    /// None.
146    pub fn static_bytes(&self) -> Option<&[u8]> {
147        match &self.0 {
148            Static { content, .. } => Some(content.as_ref()),
149            _ => None,
150        }
151    }
152
153    /// Transform this Body into a dyn [`AsyncRead`]. This will wrap
154    /// static content in a [`Cursor`]. Note that this is different
155    /// from reading directly from the Body, which includes chunked
156    /// encoding.
157    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
158        match self.0 {
159            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
160            Static { content, .. } => Box::pin(Cursor::new(content)),
161            Empty => Box::pin(Cursor::new("")),
162        }
163    }
164
165    /// Consume this body and return the full content. If the body was
166    /// constructed with [`Body::new_streaming`], this will read the
167    /// entire streaming body into memory, awaiting the streaming
168    /// source's completion. This function will return an error if a
169    /// streaming body has already been partially or fully read.
170    ///
171    /// # Errors
172    ///
173    /// This returns an error variant if either of the following conditions are met:
174    ///
175    /// there is an io error when reading from the underlying transport such as a disconnect
176    /// the body has already been read to completion
177    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
178        match self.0 {
179            Static { content, .. } => Ok(content),
180
181            Streaming {
182                async_read,
183                len,
184                progress: 0,
185                done: false,
186                ..
187            } => {
188                let mut async_read = async_read.into_inner();
189                let mut buf = len
190                    .and_then(|c| c.try_into().ok())
191                    .map(Vec::with_capacity)
192                    .unwrap_or_default();
193
194                async_read.read_to_end(&mut buf).await?;
195
196                Ok(Cow::Owned(buf))
197            }
198
199            Empty => Ok(Cow::Borrowed(b"")),
200
201            Streaming { .. } => Err(Error::other("body already read to completion")),
202        }
203    }
204
205    /// Retrieve the number of bytes that have been read from this
206    /// body
207    pub fn bytes_read(&self) -> u64 {
208        self.0.bytes_read()
209    }
210
211    /// returns the content length of this body, if known and
212    /// available.
213    pub fn len(&self) -> Option<u64> {
214        self.0.len()
215    }
216
217    /// determine if the this body represents no data
218    pub fn is_empty(&self) -> bool {
219        self.0.is_empty()
220    }
221
222    /// determine if the this body represents static content
223    pub fn is_static(&self) -> bool {
224        matches!(self.0, Static { .. })
225    }
226
227    /// determine if the this body represents streaming content
228    pub fn is_streaming(&self) -> bool {
229        matches!(self.0, Streaming { .. })
230    }
231
232    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
233    ///
234    /// Static bodies (constructed via [`Body::new_static`] or any `From` conversion for
235    /// `Vec<u8>`, `&'static [u8]`, `String`, `&'static str`, etc.) clone cheaply — just a
236    /// `Cow` clone, which is a pointer copy for borrowed `&'static` content and a `Vec` clone
237    /// for owned content. The clone resets read progress, so it can be sent again from the
238    /// beginning.
239    ///
240    /// Empty bodies always clone successfully.
241    ///
242    /// This is useful for client middleware that needs to retransmit a body — e.g., redirect
243    /// handlers, retry handlers, or auth-refresh handlers.
244    #[doc(hidden)]
245    #[cfg(feature = "unstable")]
246    pub fn try_clone(&self) -> Option<Self> {
247        match &self.0 {
248            Empty => Some(Self::default()),
249            Static { content, .. } => Some(Self(Static {
250                content: content.clone(),
251                cursor: 0,
252            })),
253            Streaming { .. } => None,
254        }
255    }
256
257    /// Convert this body into an `H3Body` for reading
258    #[cfg(feature = "unstable")]
259    pub fn into_h3(self) -> H3Body {
260        H3Body::new(self)
261    }
262
263    /// Convert this body into an `H3Body` for reading
264    #[cfg(not(feature = "unstable"))]
265    pub(crate) fn into_h3(self) -> H3Body {
266        H3Body::new(self)
267    }
268
269    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
270    ///
271    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
272    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
273    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
274    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
275    pub(crate) fn into_h2(self) -> H2Body {
276        H2Body::new(self)
277    }
278}
279
280#[allow(
281    clippy::cast_sign_loss,
282    clippy::cast_possible_truncation,
283    clippy::cast_precision_loss
284)]
285fn max_bytes_to_read(buf_len: usize) -> usize {
286    assert!(
287        buf_len >= 6,
288        "buffers of length {buf_len} are too small for this implementation.
289            if this is a problem for you, please open an issue"
290    );
291
292    // #[allow(clippy::cast_precision_loss)] applied to the function
293    // is for this line. We do not expect our buffers to be on the
294    // order of petabytes, so we will not fall outside of the range of
295    // integers that can be represented by f64
296    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
297
298    // #[allow(clippy::cast_sign_loss)] applied to the function is for
299    // this line. This is ok because we know buf_len is already a
300    // usize and we are just converting it to an f64 in order to do
301    // float log2(x)/4
302    //
303    // the maximum number of bytes that the hex representation of remaining bytes might take
304    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
305
306    // #[allow(clippy::cast_sign_loss)] applied to the function is for
307    // this line.  This is ok because max_bytes_of_hex_framing will
308    // always be smaller than bytes_remaining_after_two_cr_lns, and so
309    // there is no risk of sign loss
310    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
311}
312
313impl AsyncRead for Body {
314    fn poll_read(
315        mut self: Pin<&mut Self>,
316        cx: &mut Context<'_>,
317        buf: &mut [u8],
318    ) -> Poll<Result<usize>> {
319        match &mut self.0 {
320            Empty => Poll::Ready(Ok(0)),
321            Static { content, cursor } => {
322                let length = content.len();
323                if length == *cursor {
324                    return Poll::Ready(Ok(0));
325                }
326                let bytes = (length - *cursor).min(buf.len());
327                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
328                *cursor += bytes;
329                Poll::Ready(Ok(bytes))
330            }
331
332            Streaming {
333                async_read,
334                len: Some(len),
335                done,
336                progress,
337                ..
338            } => {
339                if *done {
340                    return Poll::Ready(Ok(0));
341                }
342
343                let max_bytes_to_read = (*len - *progress)
344                    .try_into()
345                    .unwrap_or(buf.len())
346                    .min(buf.len());
347
348                let bytes = ready!(
349                    async_read
350                        .get_mut()
351                        .as_mut()
352                        .poll_read(cx, &mut buf[..max_bytes_to_read])
353                )?;
354
355                if bytes == 0 {
356                    *done = true;
357                } else {
358                    *progress += bytes as u64;
359                }
360
361                Poll::Ready(Ok(bytes))
362            }
363
364            Streaming {
365                async_read,
366                len: None,
367                done,
368                progress,
369                chunked_framing,
370            } => {
371                if *done {
372                    return Poll::Ready(Ok(0));
373                }
374
375                if !*chunked_framing {
376                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
377                    if bytes == 0 {
378                        *done = true;
379                    } else {
380                        *progress += bytes as u64;
381                    }
382                    return Poll::Ready(Ok(bytes));
383                }
384
385                let max_bytes_to_read = max_bytes_to_read(buf.len());
386
387                let bytes = ready!(
388                    async_read
389                        .get_mut()
390                        .as_mut()
391                        .poll_read(cx, &mut buf[..max_bytes_to_read])
392                )?;
393
394                if bytes == 0 {
395                    *done = true;
396                    // Write only the last-chunk marker (`0\r\n`). The caller must then
397                    // emit the trailer-section (possibly empty) followed by the
398                    // terminating `\r\n` to complete RFC 9112 §7.1.2 chunked framing.
399                    //
400                    // This split is structural, not a missed opportunity to encapsulate:
401                    //   * Trailers come from `BodySource::trailers() -> Option<Headers>` after EOF,
402                    //     not from this `AsyncRead` path. They are structured `Headers` data, not
403                    //     bytes.
404                    //   * Formatting them needs `HttpContext` config (e.g.
405                    //     `panic_on_invalid_response_headers`) that `Body` does not carry, and
406                    //     reuses the same `write_headers_or_trailers` helper used for the
407                    //     response-header section.
408                    //   * Trailers can be arbitrarily large; emitting them from inside `poll_read`
409                    //     would force a multi-poll state machine to span buffers. The caller writes
410                    //     them in one shot via `BufWriter::buffer_mut()`, which has no such
411                    //     constraint.
412                    //
413                    // Caller stitch lives in `conn/h1.rs::Conn::send` after the
414                    // `bufwriter.copy_from(&mut body, ...)` drain.
415                    buf[..3].copy_from_slice(b"0\r\n");
416                    return Poll::Ready(Ok(3));
417                }
418
419                *progress += bytes as u64;
420
421                let start = format!("{bytes:X}\r\n");
422                let start_length = start.len();
423                let total = bytes + start_length + 2;
424                buf.copy_within(..bytes, start_length);
425                buf[..start_length].copy_from_slice(start.as_bytes());
426                buf[total - 2..total].copy_from_slice(b"\r\n");
427                Poll::Ready(Ok(total))
428            }
429        }
430    }
431}
432
433struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
434impl Debug for SyncAsyncReader {
435    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
436        f.debug_struct("SyncAsyncReader").finish()
437    }
438}
439impl AsyncRead for SyncAsyncReader {
440    fn poll_read(
441        self: Pin<&mut Self>,
442        cx: &mut Context<'_>,
443        buf: &mut [u8],
444    ) -> Poll<Result<usize>> {
445        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
446    }
447}
448
449#[derive(Default)]
450pub(crate) enum BodyType {
451    #[default]
452    Empty,
453
454    Static {
455        content: Cow<'static, [u8]>,
456        cursor: usize,
457    },
458
459    Streaming {
460        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
461        progress: u64,
462        len: Option<u64>,
463        done: bool,
464        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits RFC 9112
465        /// chunked-encoding framing for the `len: None` case so the h1 codec can
466        /// write the bytes directly. When false (set via
467        /// [`Body::without_chunked_framing`]), the same path passes through raw
468        /// bytes from the inner [`BodySource`].
469        chunked_framing: bool,
470    },
471}
472
473impl Debug for BodyType {
474    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
475        match self {
476            Empty => f.debug_tuple("BodyType::Empty").finish(),
477            Static { content, cursor } => f
478                .debug_struct("BodyType::Static")
479                .field("content", &String::from_utf8_lossy(content))
480                .field("cursor", cursor)
481                .finish(),
482            Streaming {
483                len,
484                done,
485                progress,
486                ..
487            } => f
488                .debug_struct("BodyType::Streaming")
489                .field("async_read", &format_args!(".."))
490                .field("len", &len)
491                .field("done", &done)
492                .field("progress", &progress)
493                .finish(),
494        }
495    }
496}
497
498impl BodyType {
499    fn is_empty(&self) -> bool {
500        match *self {
501            Empty => true,
502            Static { ref content, .. } => content.is_empty(),
503            Streaming { len, .. } => len == Some(0),
504        }
505    }
506
507    fn len(&self) -> Option<u64> {
508        match *self {
509            Empty => Some(0),
510            Static { ref content, .. } => Some(content.len() as u64),
511            Streaming { len, .. } => len,
512        }
513    }
514
515    fn bytes_read(&self) -> u64 {
516        match *self {
517            Empty => 0,
518            Static { cursor, .. } => cursor as u64,
519            Streaming { progress, .. } => progress,
520        }
521    }
522}
523
524impl From<String> for Body {
525    fn from(s: String) -> Self {
526        s.into_bytes().into()
527    }
528}
529
530impl From<&'static str> for Body {
531    fn from(s: &'static str) -> Self {
532        s.as_bytes().into()
533    }
534}
535
536impl From<&'static [u8]> for Body {
537    fn from(content: &'static [u8]) -> Self {
538        Self::new_static(content)
539    }
540}
541
542impl From<Vec<u8>> for Body {
543    fn from(content: Vec<u8>) -> Self {
544        Self::new_static(content)
545    }
546}
547
548impl From<Cow<'static, [u8]>> for Body {
549    fn from(value: Cow<'static, [u8]>) -> Self {
550        Self::new_static(value)
551    }
552}
553
554impl From<Cow<'static, str>> for Body {
555    fn from(value: Cow<'static, str>) -> Self {
556        match value {
557            Cow::Borrowed(b) => b.into(),
558            Cow::Owned(o) => o.into(),
559        }
560    }
561}
562
563#[cfg(test)]
564mod test_bytes_to_read {
565    #[test]
566    fn simple_check_of_known_values() {
567        // the marked rows are the most important part of this test,
568        // and a nonobvious but intentional consequence of the
569        // implementation. in order to avoid overflowing, we must use
570        // one fewer than the available buffer bytes because
571        // increasing the read size increase the number of framed
572        // bytes by two. This occurs when the hex representation of
573        // the content bytes is near an increase in order of magnitude
574        // (F->10, FF->100, FFF-> 1000, etc)
575        let values = vec![
576            (6, 1),       // 1
577            (7, 2),       // 2
578            (20, 15),     // F
579            (21, 15),     // F <-
580            (22, 16),     // 10
581            (23, 17),     // 11
582            (260, 254),   // FE
583            (261, 254),   // FE <-
584            (262, 255),   // FF <-
585            (263, 256),   // 100
586            (4100, 4093), // FFD
587            (4101, 4093), // FFD <-
588            (4102, 4094), // FFE <-
589            (4103, 4095), // FFF <-
590            (4104, 4096), // 1000
591        ];
592
593        for (input, expected) in values {
594            let actual = super::max_bytes_to_read(input);
595            assert_eq!(
596                actual, expected,
597                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
598            );
599
600            // testing the test:
601            let used_bytes = expected + 4 + format!("{expected:X}").len();
602            assert!(
603                used_bytes == input || used_bytes == input - 1,
604                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
605                input,
606                input,
607                input - 1,
608                used_bytes
609            );
610        }
611    }
612}