Skip to main content

trillium_http/
received_body.rs

1use crate::{Body, Buffer, Error, Headers, HttpConfig, MutCow, ProtocolSession, copy};
2use Poll::{Pending, Ready};
3use ReceivedBodyState::{Chunked, End, FixedLength, PartialChunkSize, Start};
4use encoding_rs::Encoding;
5use futures_lite::{AsyncRead, AsyncReadExt, AsyncWrite, ready};
6use std::{
7    fmt::{self, Debug, Formatter},
8    io::{self, ErrorKind},
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13mod chunked;
14mod fixed_length;
15mod h2_data;
16mod h3_data;
17
18/// A received http body
19///
20/// This type represents a body that will be read from the underlying transport, which it may either
21/// borrow from a [`Conn`](crate::Conn) or own.
22///
23/// ```rust
24/// # use trillium_testing::HttpTest;
25/// let app = HttpTest::new(|mut conn| async move {
26///     let body = conn.request_body();
27///     let body_string = body.read_string().await.unwrap();
28///     conn.with_response_body(format!("received: {body_string}"))
29/// });
30///
31/// app.get("/").block().assert_body("received: ");
32/// app.post("/")
33///     .with_body("hello")
34///     .block()
35///     .assert_body("received: hello");
36/// ```
37///
38/// ## Bounds checking
39///
40/// Every `ReceivedBody` has a maximum length beyond which it will return an error, expressed as a
41/// u64. To override this on the specific `ReceivedBody`, use [`ReceivedBody::with_max_len`] or
42/// [`ReceivedBody::set_max_len`]
43///
44/// The default maximum length is 10mb; see [`HttpConfig::received_body_max_len`] to configure
45/// this server-wide.
46///
47/// ## Large chunks, small read buffers
48///
49/// Attempting to read a chunked body with a buffer that is shorter than the chunk size in hex will
50/// result in an error.
51#[derive(fieldwork::Fieldwork)]
52pub struct ReceivedBody<'conn, Transport> {
53    /// The content-length of this body, if available. This
54    /// usually is derived from the content-length header. If the http
55    /// request or response that this body is attached to uses
56    /// transfer-encoding chunked, this will be None.
57    ///
58    /// ```rust
59    /// # use trillium_testing::HttpTest;
60    /// HttpTest::new(|mut conn| async move {
61    ///     let body = conn.request_body();
62    ///     assert_eq!(body.content_length(), Some(5));
63    ///     let body_string = body.read_string().await.unwrap();
64    ///     conn.with_status(200)
65    ///         .with_response_body(format!("received: {body_string}"))
66    /// })
67    /// .post("/")
68    /// .with_body("hello")
69    /// .block()
70    /// .assert_ok()
71    /// .assert_body("received: hello");
72    /// ```
73    #[field(get)]
74    content_length: Option<u64>,
75
76    buffer: MutCow<'conn, Buffer>,
77
78    transport: Option<MutCow<'conn, Transport>>,
79
80    state: MutCow<'conn, ReceivedBodyState>,
81
82    on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
83
84    /// the character encoding of this body, usually determined from the content type
85    /// (mime-type) of the associated Conn.
86    #[field(get)]
87    encoding: &'static Encoding,
88
89    /// The maximum length that can be read from this body before error
90    ///
91    /// See also [`HttpConfig::received_body_max_len`]
92    #[field(with, get, set)]
93    max_len: u64,
94
95    /// The initial buffer capacity allocated when reading the body to bytes or a string
96    ///
97    /// See [`HttpConfig::received_body_initial_len`]
98    #[field(with, get, set)]
99    initial_len: usize,
100
101    /// The maximum number of read loops that reading this received body will perform before
102    /// yielding back to the runtime
103    ///
104    /// See [`HttpConfig::copy_loops_per_yield`]
105    #[field(with, get, set)]
106    copy_loops_per_yield: usize,
107
108    /// Maximum size to pre-allocate based on content-length for buffering this received body
109    ///
110    /// See [`HttpConfig::received_body_max_preallocate`]
111    #[field(with, get, set)]
112    max_preallocate: usize,
113
114    max_header_list_size: u64,
115
116    trailers: MutCow<'conn, Option<Headers>>,
117
118    /// Byte offset into `b"HTTP/1.1 100 Continue\r\n\r\n"` that remains to be written before the
119    /// first read. `None` means no pending write.
120    send_100_continue_offset: Option<usize>,
121
122    /// The protocol session this body belongs to. Used by the body state machine's `End`
123    /// transition to pull driver-decoded trailers into
124    /// [`Conn::request_trailers`][crate::Conn]: h2 trailers come synchronously off
125    /// [`H2Connection::take_trailers`][H2Connection::take_trailers], h3 trailers come back
126    /// asynchronously via `h3_trailer_future`.
127    protocol_session: ProtocolSession,
128
129    /// a boxed future that handles decoding h3 trailers
130    h3_trailer_future:
131        Option<Pin<Box<dyn Future<Output = io::Result<Headers>> + Send + Sync + 'static>>>,
132}
133
134fn slice_from(min: u64, buf: &[u8]) -> Option<&[u8]> {
135    buf.get(usize::try_from(min).unwrap_or(usize::MAX)..)
136        .filter(|buf| !buf.is_empty())
137}
138
139impl<'conn, Transport> ReceivedBody<'conn, Transport>
140where
141    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
142{
143    #[allow(missing_docs)]
144    #[doc(hidden)]
145    pub fn new(
146        content_length: Option<u64>,
147        buffer: impl Into<MutCow<'conn, Buffer>>,
148        transport: impl Into<MutCow<'conn, Transport>>,
149        state: impl Into<MutCow<'conn, ReceivedBodyState>>,
150        on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
151        encoding: &'static Encoding,
152    ) -> Self {
153        Self::new_with_config(
154            content_length,
155            buffer,
156            transport,
157            state,
158            on_completion,
159            encoding,
160            &HttpConfig::DEFAULT,
161        )
162    }
163
164    #[allow(missing_docs)]
165    #[doc(hidden)]
166    pub(crate) fn new_with_config(
167        content_length: Option<u64>,
168        buffer: impl Into<MutCow<'conn, Buffer>>,
169        transport: impl Into<MutCow<'conn, Transport>>,
170        state: impl Into<MutCow<'conn, ReceivedBodyState>>,
171        on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
172        encoding: &'static Encoding,
173        config: &HttpConfig,
174    ) -> Self {
175        Self {
176            content_length,
177            buffer: buffer.into(),
178            transport: Some(transport.into()),
179            state: state.into(),
180            on_completion,
181            encoding,
182            max_len: config.received_body_max_len,
183            initial_len: config.received_body_initial_len,
184            copy_loops_per_yield: config.copy_loops_per_yield,
185            max_preallocate: config.received_body_max_preallocate,
186            max_header_list_size: config.max_header_list_size,
187            trailers: None.into(),
188            send_100_continue_offset: None,
189            protocol_session: ProtocolSession::Http1,
190            h3_trailer_future: None,
191        }
192    }
193
194    /// Sets the destination for trailers decoded from the request body.
195    ///
196    /// When the body is fully read, any trailers will be written to the provided storage.
197    #[doc(hidden)]
198    #[must_use]
199    pub fn with_trailers(mut self, trailers: impl Into<MutCow<'conn, Option<Headers>>>) -> Self {
200        self.trailers = trailers.into();
201        self
202    }
203
204    /// Associate this body with the [`ProtocolSession`] that produced it. The End
205    /// transition of the body state machine consults this to pull driver-decoded
206    /// trailers into [`Conn::request_trailers`][crate::Conn] (h2 synchronously,
207    /// h3 via a boxed future). For h1 bodies the session is
208    /// [`ProtocolSession::Http1`] and no trailer-driver hook fires.
209    #[doc(hidden)]
210    #[must_use]
211    #[cfg(feature = "unstable")]
212    pub fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
213        self.protocol_session = protocol_session;
214        self
215    }
216
217    #[doc(hidden)]
218    #[must_use]
219    #[cfg(not(feature = "unstable"))]
220    pub(crate) fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
221        self.protocol_session = protocol_session;
222        self
223    }
224
225    /// Arranges for `HTTP/1.1 100 Continue\r\n\r\n` to be written to the transport before the
226    /// first body read. Used to implement lazy 100-continue for HTTP/1.1 request bodies.
227    #[must_use]
228    pub(crate) fn with_send_100_continue(mut self) -> Self {
229        self.send_100_continue_offset = Some(0);
230        self
231    }
232
233    // pub fn content_length(&self) -> Option<u64> {
234    //     self.content_length
235    // }
236
237    /// # Reads entire body to String.
238    ///
239    /// This uses the encoding determined by the content-type (mime)
240    /// charset. If an encoding problem is encountered, the String
241    /// returned by [`ReceivedBody::read_string`] will contain utf8
242    /// replacement characters.
243    ///
244    /// Note that this can only be performed once per Conn, as the
245    /// underlying data is not cached anywhere. This is the only copy of
246    /// the body contents.
247    ///
248    /// # Errors
249    ///
250    /// This will return an error if there is an IO error on the
251    /// underlying transport such as a disconnect
252    ///
253    /// This will also return an error if the length exceeds the maximum length. To override this
254    /// value on this specific body, use [`ReceivedBody::with_max_len`] or
255    /// [`ReceivedBody::set_max_len`]
256    pub async fn read_string(self) -> crate::Result<String> {
257        let encoding = self.encoding();
258        let bytes = self.read_bytes().await?;
259        let (s, _, _) = encoding.decode(&bytes);
260        Ok(s.to_string())
261    }
262
263    fn owns_transport(&self) -> bool {
264        self.transport.as_ref().is_some_and(MutCow::is_owned)
265    }
266
267    /// Similar to [`ReceivedBody::read_string`], but returns the raw bytes. This is useful for
268    /// bodies that are not text.
269    ///
270    /// You can use this in conjunction with `encoding` if you need different handling of malformed
271    /// character encoding than the lossy conversion provided by [`ReceivedBody::read_string`].
272    ///
273    /// # Errors
274    ///
275    /// This will return an error if there is an IO error on the underlying transport such as a
276    /// disconnect
277    ///
278    /// This will also return an error if the length exceeds
279    /// [`received_body_max_len`][HttpConfig::with_received_body_max_len]. To override this value on
280    /// this specific body, use [`ReceivedBody::with_max_len`] or [`ReceivedBody::set_max_len`]
281    pub async fn read_bytes(mut self) -> crate::Result<Vec<u8>> {
282        let mut vec = if let Some(len) = self.content_length {
283            if len > self.max_len {
284                return Err(Error::ReceivedBodyTooLong(self.max_len));
285            }
286
287            let len = usize::try_from(len).map_err(|_| Error::ReceivedBodyTooLong(self.max_len))?;
288
289            Vec::with_capacity(len.min(self.max_preallocate))
290        } else {
291            Vec::with_capacity(self.initial_len)
292        };
293
294        self.read_to_end(&mut vec).await?;
295        Ok(vec)
296    }
297
298    // pub fn encoding(&self) -> &'static Encoding {
299    //     self.encoding
300    // }
301
302    fn read_raw(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
303        if let Some(transport) = self.transport.as_deref_mut() {
304            read_buffered(&mut self.buffer, transport, cx, buf)
305        } else {
306            Ready(Err(ErrorKind::NotConnected.into()))
307        }
308    }
309
310    /// Consumes the remainder of this body from the underlying transport by reading it to the end
311    /// and discarding the contents. This is important for http1.1 keepalive, but most of the
312    /// time you do not need to directly call this. It returns the number of bytes consumed.
313    ///
314    /// # Errors
315    ///
316    /// This will return an [`std::io::Result::Err`] if there is an io error on the underlying
317    /// transport, such as a disconnect
318    #[allow(clippy::missing_errors_doc)] // false positive
319    pub async fn drain(self) -> io::Result<u64> {
320        let copy_loops_per_yield = self.copy_loops_per_yield;
321        copy(self, futures_lite::io::sink(), copy_loops_per_yield).await
322    }
323}
324
325impl<T> ReceivedBody<'static, T> {
326    /// takes the static transport from this received body
327    pub fn take_transport(&mut self) -> Option<T> {
328        self.transport.take().map(MutCow::unwrap_owned)
329    }
330
331    #[doc(hidden)]
332    #[cfg(feature = "unstable")]
333    pub fn state(&self) -> ReceivedBodyState {
334        *self.state
335    }
336}
337
338impl<T> ReceivedBody<'_, T> {
339    /// Retype as `ReceivedBody<'static, T>` if every internal `MutCow` field is `Owned`.
340    ///
341    /// Returns `None` if any field is `Borrowed`, in which case `self` is dropped (the
342    /// borrows can't be extended, and there is no useful way to hand the half-destructured
343    /// body back to the caller). Used by trillium-client's `Drop for ResponseBody` to
344    /// promote a body whose runtime-invariant guarantees full ownership but whose
345    /// type-level `'a` parameter the compiler can't see is `'static`.
346    #[doc(hidden)]
347    #[cfg(feature = "unstable")]
348    pub fn try_into_owned(self) -> Option<ReceivedBody<'static, T>> {
349        let Self {
350            content_length,
351            buffer,
352            transport,
353            state,
354            on_completion,
355            encoding,
356            max_len,
357            initial_len,
358            copy_loops_per_yield,
359            max_preallocate,
360            max_header_list_size,
361            trailers,
362            send_100_continue_offset,
363            protocol_session,
364            h3_trailer_future,
365        } = self;
366
367        let transport = match transport {
368            None => None,
369            Some(t) => Some(t.try_into_owned()?),
370        };
371
372        Some(ReceivedBody {
373            content_length,
374            buffer: buffer.try_into_owned()?,
375            transport,
376            state: state.try_into_owned()?,
377            on_completion,
378            encoding,
379            max_len,
380            initial_len,
381            copy_loops_per_yield,
382            max_preallocate,
383            max_header_list_size,
384            trailers: trailers.try_into_owned()?,
385            send_100_continue_offset,
386            protocol_session,
387            h3_trailer_future,
388        })
389    }
390}
391
392pub(crate) fn read_buffered<Transport>(
393    buffer: &mut Buffer,
394    transport: &mut Transport,
395    cx: &mut Context<'_>,
396    buf: &mut [u8],
397) -> Poll<io::Result<usize>>
398where
399    Transport: AsyncRead + Unpin,
400{
401    if buffer.is_empty() {
402        Pin::new(transport).poll_read(cx, buf)
403    } else if buffer.len() >= buf.len() {
404        let len = buf.len();
405        buf.copy_from_slice(&buffer[..len]);
406        buffer.ignore_front(len);
407        Ready(Ok(len))
408    } else {
409        let self_buffer_len = buffer.len();
410        buf[..self_buffer_len].copy_from_slice(buffer);
411        buffer.truncate(0);
412        match Pin::new(transport).poll_read(cx, &mut buf[self_buffer_len..]) {
413            Ready(Ok(additional)) => Ready(Ok(additional + self_buffer_len)),
414            Pending => Ready(Ok(self_buffer_len)),
415            other @ Ready(_) => other,
416        }
417    }
418}
419
420type StateOutput = Poll<io::Result<(ReceivedBodyState, usize)>>;
421
422impl<Transport> ReceivedBody<'_, Transport>
423where
424    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
425{
426    #[inline]
427    fn handle_start(&mut self) -> StateOutput {
428        Ready(Ok((
429            match self.content_length {
430                Some(0) => End,
431
432                Some(total_length) if total_length <= self.max_len => FixedLength {
433                    current_index: 0,
434                    total: total_length,
435                },
436
437                Some(_) => {
438                    return Ready(Err(io::Error::new(
439                        ErrorKind::Unsupported,
440                        "content too long",
441                    )));
442                }
443
444                None => Chunked {
445                    remaining: 0,
446                    total: 0,
447                },
448            },
449            0,
450        )))
451    }
452}
453
454impl<Transport> AsyncRead for ReceivedBody<'_, Transport>
455where
456    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
457{
458    fn poll_read(
459        mut self: Pin<&mut Self>,
460        cx: &mut Context<'_>,
461        buf: &mut [u8],
462    ) -> Poll<io::Result<usize>> {
463        const CONTINUE: &[u8] = b"HTTP/1.1 100 Continue\r\n\r\n";
464        while let Some(offset) = self.send_100_continue_offset {
465            let n = {
466                let Some(transport) = self.transport.as_deref_mut() else {
467                    return Ready(Err(ErrorKind::NotConnected.into()));
468                };
469                if offset == 0 {
470                    log::trace!("sending 100-continue");
471                }
472                ready!(Pin::new(transport).poll_write(cx, &CONTINUE[offset..]))?
473            };
474            if n == 0 {
475                return Ready(Err(ErrorKind::WriteZero.into()));
476            }
477            let new_offset = offset + n;
478            self.send_100_continue_offset = if new_offset >= CONTINUE.len() {
479                None
480            } else {
481                Some(new_offset)
482            };
483        }
484
485        for _ in 0..self.copy_loops_per_yield {
486            let (new_body_state, bytes) = ready!(match *self.state {
487                Start => self.handle_start(),
488                Chunked { remaining, total } => self.handle_chunked(cx, buf, remaining, total),
489                PartialChunkSize { total } => self.handle_partial(cx, buf, total),
490                FixedLength {
491                    current_index,
492                    total,
493                } => self.handle_fixed_length(cx, buf, current_index, total),
494                ReceivedBodyState::H2Data { total } => self.handle_h2_data(cx, buf, total),
495                ReceivedBodyState::H3Data {
496                    remaining_in_frame,
497                    total,
498                    frame_type,
499                    partial_frame_header,
500                } => self.handle_h3_data(
501                    cx,
502                    buf,
503                    remaining_in_frame,
504                    total,
505                    frame_type,
506                    partial_frame_header,
507                ),
508                ReceivedBodyState::ReadingH1Trailers { total } => {
509                    self.handle_reading_h1_trailers(cx, buf, total)
510                }
511                End => Ready(Ok((End, 0))),
512            })?;
513
514            *self.state = new_body_state;
515
516            if *self.state == End {
517                if bytes == 0
518                    && let Some(h3_trailer_future) = &mut self.h3_trailer_future
519                {
520                    let trailers = ready!(h3_trailer_future.as_mut().poll(cx))?;
521                    *self.trailers = Some(trailers);
522                    self.h3_trailer_future = None;
523                }
524
525                // h2 trailer handoff. The driver decodes trailers on a separate task and
526                // stashes them on the per-stream `StreamState` *before* signalling EOF, so
527                // by the time we reach `End` the trailers (if any) are present — no boxed
528                // future required. Replacing the session with `Http1` after the drain is the
529                // idempotency mechanism: subsequent `End` re-entries see no h2 session.
530                if bytes == 0
531                    && let Some((h2_connection, stream_id)) =
532                        std::mem::replace(&mut self.protocol_session, ProtocolSession::Http1)
533                            .as_h2()
534                    && let Some(trailers) = h2_connection.take_trailers(stream_id)
535                {
536                    *self.trailers = Some(trailers);
537                }
538
539                if self.on_completion.is_some() && self.owns_transport() {
540                    let transport = self.transport.take().unwrap().unwrap_owned();
541                    let on_completion = self.on_completion.take().unwrap();
542                    on_completion(transport);
543                }
544                return Ready(Ok(bytes));
545            } else if bytes != 0 {
546                return Ready(Ok(bytes));
547            }
548        }
549
550        cx.waker().wake_by_ref();
551        Pending
552    }
553}
554
555impl<Transport> crate::BodySource for ReceivedBody<'static, Transport>
556where
557    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
558{
559    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
560        self.get_mut().trailers.take()
561    }
562}
563
564impl<Transport> Debug for ReceivedBody<'_, Transport> {
565    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
566        f.debug_struct("ReceivedBody")
567            .field("state", &*self.state)
568            .field("content_length", &self.content_length)
569            .field("buffer", &format_args!(".."))
570            .field("on_completion", &self.on_completion.is_some())
571            .finish()
572    }
573}
574
575/// The type of H3 frame currently being processed in [`ReceivedBodyState::H3Data`].
576#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
577#[allow(missing_docs)]
578#[doc(hidden)]
579pub enum H3BodyFrameType {
580    /// Initial state — no frame decoded yet.
581    #[default]
582    Start,
583    /// Inside a DATA frame — body bytes to keep.
584    Data,
585    /// Inside an unknown frame — payload bytes to discard.
586    Unknown,
587    /// Inside a trailing HEADERS frame — accumulate into buffer for parsing.
588    Trailers,
589}
590
591#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
592#[allow(missing_docs)]
593#[doc(hidden)]
594pub enum ReceivedBodyState {
595    /// initial state
596    #[default]
597    Start,
598
599    /// read state for a chunked-encoded body. the number of bytes that have been read from the
600    /// current chunk is the difference between remaining and total.
601    Chunked {
602        /// remaining indicates the bytes left _in the current
603        /// chunk_. initial state is zero.
604        remaining: u64,
605
606        /// total indicates the absolute number of bytes read from all chunks
607        total: u64,
608    },
609
610    /// read state when we have buffered content between subsequent polls because chunk framing
611    /// overlapped a buffer boundary
612    PartialChunkSize { total: u64 },
613
614    /// read state for a fixed-length body.
615    FixedLength {
616        /// current index represents the bytes that have already been
617        /// read. initial state is zero
618        current_index: u64,
619
620        /// total length indicates the claimed length, usually
621        /// determined by the content-length header
622        total: u64,
623    },
624
625    /// read state for an H2 body. The h2 driver demuxes DATA frames into a per-stream recv
626    /// ring on a separate task before we see them, so there's no frame-boundary state here —
627    /// just a running byte total for `max_len` / content-length enforcement. Transitions to
628    /// [`End`] when the transport yields `Ready(0)` (the driver's signal that `END_STREAM`
629    /// was observed). Initial state for any body on an h2 request.
630    H2Data {
631        /// total body bytes read across all DATA frames.
632        total: u64,
633    },
634
635    /// read state for an H3 body framed as DATA frames.
636    H3Data {
637        /// bytes remaining in the current frame (DATA, Unknown, or Trailers). zero means we need
638        /// to read the next frame header.
639        remaining_in_frame: u64,
640
641        /// total body bytes read across all DATA frames.
642        total: u64,
643
644        /// what kind of frame we're currently inside.
645        frame_type: H3BodyFrameType,
646
647        /// when true, a partial frame header is sitting in `self.buffer` and needs more bytes
648        /// before we can decode it.
649        partial_frame_header: bool,
650    },
651
652    /// accumulating the HTTP/1.1 chunked trailer-section after the last-chunk (`0\r\n`).
653    ///
654    /// The trailer bytes (including any partially-received trailer headers) live in
655    /// `ReceivedBody::buffer` until a final empty line (`\r\n\r\n` or bare `\r\n`) is found.
656    ReadingH1Trailers {
657        /// total body bytes read across all chunks (for bounds-checking)
658        total: u64,
659    },
660
661    /// the terminal read state
662    End,
663}
664
665impl ReceivedBodyState {
666    pub fn new_h2() -> Self {
667        Self::H2Data { total: 0 }
668    }
669
670    pub fn new_h3() -> Self {
671        Self::H3Data {
672            remaining_in_frame: 0,
673            total: 0,
674            frame_type: H3BodyFrameType::Start,
675            partial_frame_header: false,
676        }
677    }
678}
679
680impl<Transport> From<ReceivedBody<'static, Transport>> for Body
681where
682    Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
683{
684    fn from(rb: ReceivedBody<'static, Transport>) -> Self {
685        let len = rb.content_length;
686        Body::new_with_trailers(rb, len)
687    }
688}