Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Streaming body source that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed. For plain [`AsyncRead`] sources with no
19/// trailers, [`Body::new_streaming`] is simpler.
20pub trait BodySource: AsyncRead + Send + 'static {
21    /// Returns the trailers for this body, called after the body has been fully read.
22    ///
23    /// Implementations may clear internal state on this call; the result is
24    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
25    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
26}
27
28pin_project! {
29    struct PlainBody<T> {
30        #[pin]
31        async_read: T,
32    }
33}
34
35impl<T: AsyncRead> AsyncRead for PlainBody<T> {
36    fn poll_read(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39        buf: &mut [u8],
40    ) -> Poll<Result<usize>> {
41        self.project().async_read.poll_read(cx, buf)
42    }
43}
44
45impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
46    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
47        None
48    }
49}
50
51/// The trillium representation of a http body. This can contain
52/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
53/// [`AsyncRead`]/[`BodySource`] type.
54#[derive(Debug, Default)]
55pub struct Body(pub(crate) BodyType);
56
57impl Body {
58    /// Construct a new body from a streaming [`AsyncRead`] source. If
59    /// you have the body content in memory already, prefer
60    /// [`Body::new_static`] or one of the From conversions.
61    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
62        Self::new_with_trailers(PlainBody { async_read }, len)
63    }
64
65    /// Construct a new body from a [`BodySource`] that can produce trailers after
66    /// the body has been fully read.
67    ///
68    /// Use this when trailers must be computed dynamically from the body bytes,
69    /// for example to append a content hash.
70    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
71        Self(Streaming {
72            async_read: SyncWrapper::new(Box::pin(body)),
73            len,
74            done: false,
75            progress: 0,
76            chunked_framing: true,
77            keep_open: false,
78        })
79    }
80
81    /// Disable chunked-encoding framing emitted by [`AsyncRead`] for streaming bodies
82    /// of unknown length.
83    ///
84    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
85    /// implementation emits chunked framing so the h1 codec can write its bytes directly.
86    /// That framing is wrong for any consumer that wants raw body bytes.
87    #[doc(hidden)]
88    #[cfg(feature = "unstable")]
89    #[must_use]
90    pub fn without_chunked_framing(mut self) -> Self {
91        if let Streaming {
92            ref mut chunked_framing,
93            ..
94        } = self.0
95        {
96            *chunked_framing = false;
97        }
98        self
99    }
100
101    /// Normalize this body to an open, chunk-framed stream: its content goes out as
102    /// chunked-transfer chunks with **no** terminating `0\r\n`, leaving the outbound
103    /// stream open for a following upgrade to continue and eventually close.
104    ///
105    /// Fixed-length content (`Static`, or a streaming body with a known length) is
106    /// re-sourced through the chunked path so it too flows as ordinary chunks rather than
107    /// raw bytes — the caller doesn't have to hand-wrap it in a length-less streaming body.
108    /// An empty body stays empty (it contributes no bytes; the upgrade owns the whole
109    /// stream).
110    ///
111    /// The send site that consumes the body is responsible for *not* writing the
112    /// trailer-section terminator either; trailers (if any) ride onto the upgrade.
113    #[doc(hidden)]
114    #[cfg(feature = "unstable")]
115    #[must_use]
116    pub fn keep_open(mut self) -> Self {
117        self.set_keep_open();
118        self
119    }
120
121    /// In-crate counterpart to [`keep_open`](Self::keep_open) — the server send path sets
122    /// this from `should_upgrade()` and can't reach the `unstable`-gated public builder.
123    pub(crate) fn set_keep_open(&mut self) {
124        // Re-source fixed content through the chunked streaming path so it goes out as
125        // chunks instead of raw bytes. Streaming bodies are left in place (preserving their
126        // `BodySource`, hence any trailers) and just have their framing flags flipped below.
127        if matches!(self.0, Static { .. }) {
128            let reader = std::mem::take(self).into_reader();
129            *self = Self::new_streaming(reader, None);
130        }
131
132        if let Streaming {
133            ref mut len,
134            ref mut chunked_framing,
135            ref mut keep_open,
136            ..
137        } = self.0
138        {
139            *len = None;
140            *chunked_framing = true;
141            *keep_open = true;
142        }
143    }
144
145    pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
146        if let Streaming {
147            ref mut chunked_framing,
148            ..
149        } = self.0
150        {
151            *chunked_framing = true;
152        }
153
154        self
155    }
156
157    /// Returns trailers from the body source, if any.
158    ///
159    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
160    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
161    /// [`Body::new_streaming`] or [`Body::new_static`].
162    #[doc(hidden)]
163    pub fn trailers(&mut self) -> Option<Headers> {
164        match &mut self.0 {
165            Streaming {
166                async_read, done, ..
167            } if *done => async_read.get_mut().as_mut().trailers(),
168            _ => None,
169        }
170    }
171
172    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
173    /// [u8]`.
174    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
175        Self(Static {
176            content: content.into(),
177            cursor: 0,
178        })
179    }
180
181    /// Retrieve a borrow of the static content in this body. If this
182    /// body is a streaming body or an empty body, this will return
183    /// None.
184    pub fn static_bytes(&self) -> Option<&[u8]> {
185        match &self.0 {
186            Static { content, .. } => Some(content.as_ref()),
187            _ => None,
188        }
189    }
190
191    /// Transform this Body into a dyn [`AsyncRead`], wrapping static content in
192    /// a [`Cursor`]. Unlike reading from the Body directly, this does not apply
193    /// chunked encoding.
194    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
195        match self.0 {
196            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
197            Static { content, .. } => Box::pin(Cursor::new(content)),
198            Empty => Box::pin(Cursor::new("")),
199        }
200    }
201
202    /// Consume this body and return the full content. If the body was constructed
203    /// with [`Body::new_streaming`], this will read the entire streaming body into
204    /// memory, awaiting the streaming source's completion.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if the underlying transport errors, or if a streaming body
209    /// has already been partially or fully read.
210    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
211        match self.0 {
212            Static { content, .. } => Ok(content),
213
214            Streaming {
215                async_read,
216                len,
217                progress: 0,
218                done: false,
219                ..
220            } => {
221                let mut async_read = async_read.into_inner();
222                let mut buf = len
223                    .and_then(|c| c.try_into().ok())
224                    .map(Vec::with_capacity)
225                    .unwrap_or_default();
226
227                async_read.read_to_end(&mut buf).await?;
228
229                Ok(Cow::Owned(buf))
230            }
231
232            Empty => Ok(Cow::Borrowed(b"")),
233
234            Streaming { .. } => Err(Error::other("body already read to completion")),
235        }
236    }
237
238    /// Retrieve the number of bytes that have been read from this
239    /// body
240    pub fn bytes_read(&self) -> u64 {
241        self.0.bytes_read()
242    }
243
244    /// returns the content length of this body, if known and
245    /// available.
246    pub fn len(&self) -> Option<u64> {
247        self.0.len()
248    }
249
250    /// determine if the this body represents no data
251    pub fn is_empty(&self) -> bool {
252        self.0.is_empty()
253    }
254
255    /// determine if the this body represents static content
256    pub fn is_static(&self) -> bool {
257        matches!(self.0, Static { .. })
258    }
259
260    /// determine if the this body represents streaming content
261    pub fn is_streaming(&self) -> bool {
262        matches!(self.0, Streaming { .. })
263    }
264
265    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
266    ///
267    /// Static bodies clone cheaply — a `Cow` clone, which is a pointer copy for borrowed
268    /// `&'static` content and a `Vec` clone for owned content. The clone resets read
269    /// progress, so it can be sent again from the beginning. Empty bodies always clone
270    /// successfully.
271    #[doc(hidden)]
272    #[cfg(feature = "unstable")]
273    pub fn try_clone(&self) -> Option<Self> {
274        match &self.0 {
275            Empty => Some(Self::default()),
276            Static { content, .. } => Some(Self(Static {
277                content: content.clone(),
278                cursor: 0,
279            })),
280            Streaming { .. } => None,
281        }
282    }
283
284    /// Convert this body into an `H3Body` for reading
285    #[cfg(feature = "unstable")]
286    pub fn into_h3(self) -> H3Body {
287        H3Body::new(self)
288    }
289
290    /// Convert this body into an `H3Body` for reading
291    #[cfg(not(feature = "unstable"))]
292    pub(crate) fn into_h3(self) -> H3Body {
293        H3Body::new(self)
294    }
295
296    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
297    ///
298    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
299    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
300    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
301    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
302    pub(crate) fn into_h2(self) -> H2Body {
303        H2Body::new(self)
304    }
305}
306
307#[allow(
308    clippy::cast_sign_loss,
309    clippy::cast_possible_truncation,
310    clippy::cast_precision_loss,
311    reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
312              the subtraction always yields a non-negative usize-representable value"
313)]
314fn max_bytes_to_read(buf_len: usize) -> usize {
315    assert!(
316        buf_len >= 6,
317        "buffers of length {buf_len} are too small for this implementation.
318            if this is a problem for you, please open an issue"
319    );
320
321    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
322    // maximum number of bytes the hex representation of the remaining bytes might take
323    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
324    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
325}
326
327impl AsyncRead for Body {
328    fn poll_read(
329        mut self: Pin<&mut Self>,
330        cx: &mut Context<'_>,
331        buf: &mut [u8],
332    ) -> Poll<Result<usize>> {
333        match &mut self.0 {
334            Empty => Poll::Ready(Ok(0)),
335            Static { content, cursor } => {
336                let length = content.len();
337                if length == *cursor {
338                    return Poll::Ready(Ok(0));
339                }
340                let bytes = (length - *cursor).min(buf.len());
341                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
342                *cursor += bytes;
343                Poll::Ready(Ok(bytes))
344            }
345
346            Streaming {
347                async_read,
348                len: Some(len),
349                done,
350                progress,
351                ..
352            } => {
353                if *done {
354                    return Poll::Ready(Ok(0));
355                }
356
357                let max_bytes_to_read = (*len - *progress)
358                    .try_into()
359                    .unwrap_or(buf.len())
360                    .min(buf.len());
361
362                let bytes = ready!(
363                    async_read
364                        .get_mut()
365                        .as_mut()
366                        .poll_read(cx, &mut buf[..max_bytes_to_read])
367                )?;
368
369                if bytes == 0 {
370                    *done = true;
371                } else {
372                    *progress += bytes as u64;
373                }
374
375                Poll::Ready(Ok(bytes))
376            }
377
378            Streaming {
379                async_read,
380                len: None,
381                done,
382                progress,
383                chunked_framing,
384                keep_open,
385            } => {
386                if *done {
387                    return Poll::Ready(Ok(0));
388                }
389
390                if !*chunked_framing {
391                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
392                    if bytes == 0 {
393                        *done = true;
394                    } else {
395                        *progress += bytes as u64;
396                    }
397                    return Poll::Ready(Ok(bytes));
398                }
399
400                let max_bytes_to_read = max_bytes_to_read(buf.len());
401
402                let bytes = ready!(
403                    async_read
404                        .get_mut()
405                        .as_mut()
406                        .poll_read(cx, &mut buf[..max_bytes_to_read])
407                )?;
408
409                if bytes == 0 {
410                    *done = true;
411                    if *keep_open {
412                        // The outbound stream continues into an upgrade; the upgrade owns
413                        // the terminator. Emit no last-chunk marker.
414                        return Poll::Ready(Ok(0));
415                    }
416                    // Last-chunk marker only; the caller emits the trailer-section
417                    // (possibly empty) followed by the terminating `\r\n`. Trailers come
418                    // from `BodySource::trailers()` as structured `Headers`, not bytes,
419                    // and the caller writes them in one shot so this path doesn't need
420                    // a multi-poll state machine spanning buffers.
421                    buf[..3].copy_from_slice(b"0\r\n");
422                    return Poll::Ready(Ok(3));
423                }
424
425                *progress += bytes as u64;
426
427                let start = format!("{bytes:X}\r\n");
428                let start_length = start.len();
429                let total = bytes + start_length + 2;
430                buf.copy_within(..bytes, start_length);
431                buf[..start_length].copy_from_slice(start.as_bytes());
432                buf[total - 2..total].copy_from_slice(b"\r\n");
433                Poll::Ready(Ok(total))
434            }
435        }
436    }
437}
438
439struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
440impl Debug for SyncAsyncReader {
441    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
442        f.debug_struct("SyncAsyncReader").finish()
443    }
444}
445impl AsyncRead for SyncAsyncReader {
446    fn poll_read(
447        self: Pin<&mut Self>,
448        cx: &mut Context<'_>,
449        buf: &mut [u8],
450    ) -> Poll<Result<usize>> {
451        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
452    }
453}
454
455#[derive(Default)]
456pub(crate) enum BodyType {
457    #[default]
458    Empty,
459
460    Static {
461        content: Cow<'static, [u8]>,
462        cursor: usize,
463    },
464
465    Streaming {
466        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
467        progress: u64,
468        len: Option<u64>,
469        done: bool,
470        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits chunked
471        /// framing for the `len: None` case; when false (via
472        /// [`Body::without_chunked_framing`]), it passes through raw bytes.
473        chunked_framing: bool,
474        /// When true (via [`Body::keep_open`]), the chunked `len: None` read does not
475        /// emit the `0\r\n` last-chunk marker at EOF — the outbound stream is left open
476        /// for a following upgrade to terminate. Default false.
477        keep_open: bool,
478    },
479}
480
481impl Debug for BodyType {
482    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
483        match self {
484            Empty => f.debug_tuple("BodyType::Empty").finish(),
485            Static { content, cursor } => f
486                .debug_struct("BodyType::Static")
487                .field("content", &String::from_utf8_lossy(content))
488                .field("cursor", cursor)
489                .finish(),
490            Streaming {
491                len,
492                done,
493                progress,
494                ..
495            } => f
496                .debug_struct("BodyType::Streaming")
497                .field("async_read", &format_args!(".."))
498                .field("len", &len)
499                .field("done", &done)
500                .field("progress", &progress)
501                .finish(),
502        }
503    }
504}
505
506impl BodyType {
507    fn is_empty(&self) -> bool {
508        match *self {
509            Empty => true,
510            Static { ref content, .. } => content.is_empty(),
511            Streaming { len, .. } => len == Some(0),
512        }
513    }
514
515    fn len(&self) -> Option<u64> {
516        match *self {
517            Empty => Some(0),
518            Static { ref content, .. } => Some(content.len() as u64),
519            Streaming { len, .. } => len,
520        }
521    }
522
523    fn bytes_read(&self) -> u64 {
524        match *self {
525            Empty => 0,
526            Static { cursor, .. } => cursor as u64,
527            Streaming { progress, .. } => progress,
528        }
529    }
530}
531
532impl From<String> for Body {
533    fn from(s: String) -> Self {
534        s.into_bytes().into()
535    }
536}
537
538impl From<&'static str> for Body {
539    fn from(s: &'static str) -> Self {
540        s.as_bytes().into()
541    }
542}
543
544impl From<&'static [u8]> for Body {
545    fn from(content: &'static [u8]) -> Self {
546        Self::new_static(content)
547    }
548}
549
550impl From<Vec<u8>> for Body {
551    fn from(content: Vec<u8>) -> Self {
552        Self::new_static(content)
553    }
554}
555
556impl From<Cow<'static, [u8]>> for Body {
557    fn from(value: Cow<'static, [u8]>) -> Self {
558        Self::new_static(value)
559    }
560}
561
562impl From<Cow<'static, str>> for Body {
563    fn from(value: Cow<'static, str>) -> Self {
564        match value {
565            Cow::Borrowed(b) => b.into(),
566            Cow::Owned(o) => o.into(),
567        }
568    }
569}
570
571#[cfg(test)]
572mod test_bytes_to_read {
573    #[test]
574    fn simple_check_of_known_values() {
575        // the marked rows are the most important part of this test,
576        // and a nonobvious but intentional consequence of the
577        // implementation. in order to avoid overflowing, we must use
578        // one fewer than the available buffer bytes because
579        // increasing the read size increase the number of framed
580        // bytes by two. This occurs when the hex representation of
581        // the content bytes is near an increase in order of magnitude
582        // (F->10, FF->100, FFF-> 1000, etc)
583        let values = vec![
584            (6, 1),       // 1
585            (7, 2),       // 2
586            (20, 15),     // F
587            (21, 15),     // F <-
588            (22, 16),     // 10
589            (23, 17),     // 11
590            (260, 254),   // FE
591            (261, 254),   // FE <-
592            (262, 255),   // FF <-
593            (263, 256),   // 100
594            (4100, 4093), // FFD
595            (4101, 4093), // FFD <-
596            (4102, 4094), // FFE <-
597            (4103, 4095), // FFF <-
598            (4104, 4096), // 1000
599        ];
600
601        for (input, expected) in values {
602            let actual = super::max_bytes_to_read(input);
603            assert_eq!(
604                actual, expected,
605                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
606            );
607
608            // testing the test:
609            let used_bytes = expected + 4 + format!("{expected:X}").len();
610            assert!(
611                used_bytes == input || used_bytes == input - 1,
612                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
613                input,
614                input,
615                input - 1,
616                used_bytes
617            );
618        }
619    }
620}