Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Trait for streaming body sources that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed.
19///
20/// For plain [`AsyncRead`] sources with no trailers, use [`Body::new_streaming`].
21/// `BodySource` is only needed when trailers must be produced.
22pub trait BodySource: AsyncRead + Send + 'static {
23    /// Returns the trailers for this body, called after the body has been fully read.
24    ///
25    /// Implementations may clear internal state on this call; the result is
26    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
27    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
28}
29
30pin_project! {
31    struct PlainBody<T> {
32        #[pin]
33        async_read: T,
34    }
35}
36
37impl<T: AsyncRead> AsyncRead for PlainBody<T> {
38    fn poll_read(
39        self: Pin<&mut Self>,
40        cx: &mut Context<'_>,
41        buf: &mut [u8],
42    ) -> Poll<Result<usize>> {
43        self.project().async_read.poll_read(cx, buf)
44    }
45}
46
47impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
48    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
49        None
50    }
51}
52
53/// The trillium representation of a http body. This can contain
54/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
55/// [`AsyncRead`]/[`BodySource`] type.
56#[derive(Debug, Default)]
57pub struct Body(pub(crate) BodyType);
58
59impl Body {
60    /// Construct a new body from a streaming [`AsyncRead`] source. If
61    /// you have the body content in memory already, prefer
62    /// [`Body::new_static`] or one of the From conversions.
63    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
64        Self::new_with_trailers(PlainBody { async_read }, len)
65    }
66
67    /// Construct a new body from a [`BodySource`] that can produce trailers after
68    /// the body has been fully read.
69    ///
70    /// Use this when trailers must be computed dynamically from the body bytes,
71    /// for example to append a content hash.
72    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
73        Self(Streaming {
74            async_read: SyncWrapper::new(Box::pin(body)),
75            len,
76            done: false,
77            progress: 0,
78        })
79    }
80
81    /// Returns trailers from the body source, if any.
82    ///
83    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
84    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
85    /// [`Body::new_streaming`] or [`Body::new_static`].
86    #[doc(hidden)] // this isn't really a user-facing interface
87    pub fn trailers(&mut self) -> Option<Headers> {
88        match &mut self.0 {
89            Streaming {
90                async_read, done, ..
91            } if *done => async_read.get_mut().as_mut().trailers(),
92            _ => None,
93        }
94    }
95
96    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
97    /// [u8]`.
98    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
99        Self(Static {
100            content: content.into(),
101            cursor: 0,
102        })
103    }
104
105    /// Retrieve a borrow of the static content in this body. If this
106    /// body is a streaming body or an empty body, this will return
107    /// None.
108    pub fn static_bytes(&self) -> Option<&[u8]> {
109        match &self.0 {
110            Static { content, .. } => Some(content.as_ref()),
111            _ => None,
112        }
113    }
114
115    /// Transform this Body into a dyn [`AsyncRead`]. This will wrap
116    /// static content in a [`Cursor`]. Note that this is different
117    /// from reading directly from the Body, which includes chunked
118    /// encoding.
119    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
120        match self.0 {
121            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
122            Static { content, .. } => Box::pin(Cursor::new(content)),
123            Empty => Box::pin(Cursor::new("")),
124        }
125    }
126
127    /// Consume this body and return the full content. If the body was
128    /// constructed with [`Body::new_streaming`], this will read the
129    /// entire streaming body into memory, awaiting the streaming
130    /// source's completion. This function will return an error if a
131    /// streaming body has already been partially or fully read.
132    ///
133    /// # Errors
134    ///
135    /// This returns an error variant if either of the following conditions are met:
136    ///
137    /// there is an io error when reading from the underlying transport such as a disconnect
138    /// the body has already been read to completion
139    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
140        match self.0 {
141            Static { content, .. } => Ok(content),
142
143            Streaming {
144                async_read,
145                len,
146                progress: 0,
147                done: false,
148            } => {
149                let mut async_read = async_read.into_inner();
150                let mut buf = len
151                    .and_then(|c| c.try_into().ok())
152                    .map(Vec::with_capacity)
153                    .unwrap_or_default();
154
155                async_read.read_to_end(&mut buf).await?;
156
157                Ok(Cow::Owned(buf))
158            }
159
160            Empty => Ok(Cow::Borrowed(b"")),
161
162            Streaming { .. } => Err(Error::other("body already read to completion")),
163        }
164    }
165
166    /// Retrieve the number of bytes that have been read from this
167    /// body
168    pub fn bytes_read(&self) -> u64 {
169        self.0.bytes_read()
170    }
171
172    /// returns the content length of this body, if known and
173    /// available.
174    pub fn len(&self) -> Option<u64> {
175        self.0.len()
176    }
177
178    /// determine if the this body represents no data
179    pub fn is_empty(&self) -> bool {
180        self.0.is_empty()
181    }
182
183    /// determine if the this body represents static content
184    pub fn is_static(&self) -> bool {
185        matches!(self.0, Static { .. })
186    }
187
188    /// determine if the this body represents streaming content
189    pub fn is_streaming(&self) -> bool {
190        matches!(self.0, Streaming { .. })
191    }
192
193    /// Convert this body into an `H3Body` for reading
194    #[cfg(feature = "unstable")]
195    pub fn into_h3(self) -> H3Body {
196        H3Body::new(self)
197    }
198
199    /// Convert this body into an `H3Body` for reading
200    #[cfg(not(feature = "unstable"))]
201    pub(crate) fn into_h3(self) -> H3Body {
202        H3Body::new(self)
203    }
204
205    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
206    ///
207    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
208    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
209    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
210    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
211    pub(crate) fn into_h2(self) -> H2Body {
212        H2Body::new(self)
213    }
214}
215
216#[allow(
217    clippy::cast_sign_loss,
218    clippy::cast_possible_truncation,
219    clippy::cast_precision_loss
220)]
221fn max_bytes_to_read(buf_len: usize) -> usize {
222    assert!(
223        buf_len >= 6,
224        "buffers of length {buf_len} are too small for this implementation.
225            if this is a problem for you, please open an issue"
226    );
227
228    // #[allow(clippy::cast_precision_loss)] applied to the function
229    // is for this line. We do not expect our buffers to be on the
230    // order of petabytes, so we will not fall outside of the range of
231    // integers that can be represented by f64
232    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
233
234    // #[allow(clippy::cast_sign_loss)] applied to the function is for
235    // this line. This is ok because we know buf_len is already a
236    // usize and we are just converting it to an f64 in order to do
237    // float log2(x)/4
238    //
239    // the maximum number of bytes that the hex representation of remaining bytes might take
240    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
241
242    // #[allow(clippy::cast_sign_loss)] applied to the function is for
243    // this line.  This is ok because max_bytes_of_hex_framing will
244    // always be smaller than bytes_remaining_after_two_cr_lns, and so
245    // there is no risk of sign loss
246    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
247}
248
249impl AsyncRead for Body {
250    fn poll_read(
251        mut self: Pin<&mut Self>,
252        cx: &mut Context<'_>,
253        buf: &mut [u8],
254    ) -> Poll<Result<usize>> {
255        match &mut self.0 {
256            Empty => Poll::Ready(Ok(0)),
257            Static { content, cursor } => {
258                let length = content.len();
259                if length == *cursor {
260                    return Poll::Ready(Ok(0));
261                }
262                let bytes = (length - *cursor).min(buf.len());
263                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
264                *cursor += bytes;
265                Poll::Ready(Ok(bytes))
266            }
267
268            Streaming {
269                async_read,
270                len: Some(len),
271                done,
272                progress,
273            } => {
274                if *done {
275                    return Poll::Ready(Ok(0));
276                }
277
278                let max_bytes_to_read = (*len - *progress)
279                    .try_into()
280                    .unwrap_or(buf.len())
281                    .min(buf.len());
282
283                let bytes = ready!(
284                    async_read
285                        .get_mut()
286                        .as_mut()
287                        .poll_read(cx, &mut buf[..max_bytes_to_read])
288                )?;
289
290                if bytes == 0 {
291                    *done = true;
292                } else {
293                    *progress += bytes as u64;
294                }
295
296                Poll::Ready(Ok(bytes))
297            }
298
299            Streaming {
300                async_read,
301                len: None,
302                done,
303                progress,
304            } => {
305                if *done {
306                    return Poll::Ready(Ok(0));
307                }
308
309                let max_bytes_to_read = max_bytes_to_read(buf.len());
310
311                let bytes = ready!(
312                    async_read
313                        .get_mut()
314                        .as_mut()
315                        .poll_read(cx, &mut buf[..max_bytes_to_read])
316                )?;
317
318                if bytes == 0 {
319                    *done = true;
320                    // Write only the last-chunk marker (`0\r\n`). The caller must then
321                    // emit the trailer-section (possibly empty) followed by the
322                    // terminating `\r\n` to complete RFC 9112 §7.1.2 chunked framing.
323                    //
324                    // This split is structural, not a missed opportunity to encapsulate:
325                    //   * Trailers come from `BodySource::trailers() -> Option<Headers>` after EOF,
326                    //     not from this `AsyncRead` path. They are structured `Headers` data, not
327                    //     bytes.
328                    //   * Formatting them needs `HttpContext` config (e.g.
329                    //     `panic_on_invalid_response_headers`) that `Body` does not carry, and
330                    //     reuses the same `write_headers_or_trailers` helper used for the
331                    //     response-header section.
332                    //   * Trailers can be arbitrarily large; emitting them from inside `poll_read`
333                    //     would force a multi-poll state machine to span buffers. The caller writes
334                    //     them in one shot via `BufWriter::buffer_mut()`, which has no such
335                    //     constraint.
336                    //
337                    // Caller stitch lives in `conn/h1.rs::Conn::send` after the
338                    // `bufwriter.copy_from(&mut body, ...)` drain.
339                    buf[..3].copy_from_slice(b"0\r\n");
340                    return Poll::Ready(Ok(3));
341                }
342
343                *progress += bytes as u64;
344
345                let start = format!("{bytes:X}\r\n");
346                let start_length = start.len();
347                let total = bytes + start_length + 2;
348                buf.copy_within(..bytes, start_length);
349                buf[..start_length].copy_from_slice(start.as_bytes());
350                buf[total - 2..total].copy_from_slice(b"\r\n");
351                Poll::Ready(Ok(total))
352            }
353        }
354    }
355}
356
357struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
358impl Debug for SyncAsyncReader {
359    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
360        f.debug_struct("SyncAsyncReader").finish()
361    }
362}
363impl AsyncRead for SyncAsyncReader {
364    fn poll_read(
365        self: Pin<&mut Self>,
366        cx: &mut Context<'_>,
367        buf: &mut [u8],
368    ) -> Poll<Result<usize>> {
369        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
370    }
371}
372
373#[derive(Default)]
374pub(crate) enum BodyType {
375    #[default]
376    Empty,
377
378    Static {
379        content: Cow<'static, [u8]>,
380        cursor: usize,
381    },
382
383    Streaming {
384        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
385        progress: u64,
386        len: Option<u64>,
387        done: bool,
388    },
389}
390
391impl Debug for BodyType {
392    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
393        match self {
394            Empty => f.debug_tuple("BodyType::Empty").finish(),
395            Static { content, cursor } => f
396                .debug_struct("BodyType::Static")
397                .field("content", &String::from_utf8_lossy(content))
398                .field("cursor", cursor)
399                .finish(),
400            Streaming {
401                len,
402                done,
403                progress,
404                ..
405            } => f
406                .debug_struct("BodyType::Streaming")
407                .field("async_read", &format_args!(".."))
408                .field("len", &len)
409                .field("done", &done)
410                .field("progress", &progress)
411                .finish(),
412        }
413    }
414}
415
416impl BodyType {
417    fn is_empty(&self) -> bool {
418        match *self {
419            Empty => true,
420            Static { ref content, .. } => content.is_empty(),
421            Streaming { len, .. } => len == Some(0),
422        }
423    }
424
425    fn len(&self) -> Option<u64> {
426        match *self {
427            Empty => Some(0),
428            Static { ref content, .. } => Some(content.len() as u64),
429            Streaming { len, .. } => len,
430        }
431    }
432
433    fn bytes_read(&self) -> u64 {
434        match *self {
435            Empty => 0,
436            Static { cursor, .. } => cursor as u64,
437            Streaming { progress, .. } => progress,
438        }
439    }
440}
441
442impl From<String> for Body {
443    fn from(s: String) -> Self {
444        s.into_bytes().into()
445    }
446}
447
448impl From<&'static str> for Body {
449    fn from(s: &'static str) -> Self {
450        s.as_bytes().into()
451    }
452}
453
454impl From<&'static [u8]> for Body {
455    fn from(content: &'static [u8]) -> Self {
456        Self::new_static(content)
457    }
458}
459
460impl From<Vec<u8>> for Body {
461    fn from(content: Vec<u8>) -> Self {
462        Self::new_static(content)
463    }
464}
465
466impl From<Cow<'static, [u8]>> for Body {
467    fn from(value: Cow<'static, [u8]>) -> Self {
468        Self::new_static(value)
469    }
470}
471
472impl From<Cow<'static, str>> for Body {
473    fn from(value: Cow<'static, str>) -> Self {
474        match value {
475            Cow::Borrowed(b) => b.into(),
476            Cow::Owned(o) => o.into(),
477        }
478    }
479}
480
481#[cfg(test)]
482mod test_bytes_to_read {
483    #[test]
484    fn simple_check_of_known_values() {
485        // the marked rows are the most important part of this test,
486        // and a nonobvious but intentional consequence of the
487        // implementation. in order to avoid overflowing, we must use
488        // one fewer than the available buffer bytes because
489        // increasing the read size increase the number of framed
490        // bytes by two. This occurs when the hex representation of
491        // the content bytes is near an increase in order of magnitude
492        // (F->10, FF->100, FFF-> 1000, etc)
493        let values = vec![
494            (6, 1),       // 1
495            (7, 2),       // 2
496            (20, 15),     // F
497            (21, 15),     // F <-
498            (22, 16),     // 10
499            (23, 17),     // 11
500            (260, 254),   // FE
501            (261, 254),   // FE <-
502            (262, 255),   // FF <-
503            (263, 256),   // 100
504            (4100, 4093), // FFD
505            (4101, 4093), // FFD <-
506            (4102, 4094), // FFE <-
507            (4103, 4095), // FFF <-
508            (4104, 4096), // 1000
509        ];
510
511        for (input, expected) in values {
512            let actual = super::max_bytes_to_read(input);
513            assert_eq!(
514                actual, expected,
515                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
516            );
517
518            // testing the test:
519            let used_bytes = expected + 4 + format!("{expected:X}").len();
520            assert!(
521                used_bytes == input || used_bytes == input - 1,
522                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
523                input,
524                input,
525                input - 1,
526                used_bytes
527            );
528        }
529    }
530}