trillium_http/received_body.rs
1use crate::{Body, Buffer, Error, Headers, HttpConfig, MutCow, ProtocolSession, copy};
2use Poll::{Pending, Ready};
3use ReceivedBodyState::{Chunked, End, FixedLength, PartialChunkSize, Start};
4use encoding_rs::Encoding;
5use futures_lite::{AsyncRead, AsyncReadExt, AsyncWrite, ready};
6use std::{
7 fmt::{self, Debug, Formatter},
8 io::{self, ErrorKind},
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13mod chunked;
14mod fixed_length;
15mod h2_data;
16mod h3_data;
17
18/// A received http body
19///
20/// This type represents a body that will be read from the underlying transport, which it may either
21/// borrow from a [`Conn`](crate::Conn) or own.
22///
23/// ```rust
24/// # use trillium_testing::HttpTest;
25/// let app = HttpTest::new(|mut conn| async move {
26/// let body = conn.request_body();
27/// let body_string = body.read_string().await.unwrap();
28/// conn.with_response_body(format!("received: {body_string}"))
29/// });
30///
31/// app.get("/").block().assert_body("received: ");
32/// app.post("/")
33/// .with_body("hello")
34/// .block()
35/// .assert_body("received: hello");
36/// ```
37///
38/// ## Bounds checking
39///
40/// Every `ReceivedBody` has a maximum length beyond which it will return an error, expressed as a
41/// u64. To override this on the specific `ReceivedBody`, use [`ReceivedBody::with_max_len`] or
42/// [`ReceivedBody::set_max_len`]
43///
44/// The default maximum length is 10mb; see [`HttpConfig::received_body_max_len`] to configure
45/// this server-wide.
46///
47/// ## Large chunks, small read buffers
48///
49/// Attempting to read a chunked body with a buffer that is shorter than the chunk size in hex will
50/// result in an error.
51#[derive(fieldwork::Fieldwork)]
52pub struct ReceivedBody<'conn, Transport> {
53 /// The content-length of this body, if available. This
54 /// usually is derived from the content-length header. If the http
55 /// request or response that this body is attached to uses
56 /// transfer-encoding chunked, this will be None.
57 ///
58 /// ```rust
59 /// # use trillium_testing::HttpTest;
60 /// HttpTest::new(|mut conn| async move {
61 /// let body = conn.request_body();
62 /// assert_eq!(body.content_length(), Some(5));
63 /// let body_string = body.read_string().await.unwrap();
64 /// conn.with_status(200)
65 /// .with_response_body(format!("received: {body_string}"))
66 /// })
67 /// .post("/")
68 /// .with_body("hello")
69 /// .block()
70 /// .assert_ok()
71 /// .assert_body("received: hello");
72 /// ```
73 #[field(get)]
74 content_length: Option<u64>,
75
76 buffer: MutCow<'conn, Buffer>,
77
78 transport: Option<MutCow<'conn, Transport>>,
79
80 state: MutCow<'conn, ReceivedBodyState>,
81
82 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
83
84 /// the character encoding of this body, usually determined from the content type
85 /// (mime-type) of the associated Conn.
86 #[field(get)]
87 encoding: &'static Encoding,
88
89 /// The maximum length that can be read from this body before error
90 ///
91 /// See also [`HttpConfig::received_body_max_len`]
92 #[field(with, get, set)]
93 max_len: u64,
94
95 /// The initial buffer capacity allocated when reading the body to bytes or a string
96 ///
97 /// See [`HttpConfig::received_body_initial_len`]
98 #[field(with, get, set)]
99 initial_len: usize,
100
101 /// The maximum number of read loops that reading this received body will perform before
102 /// yielding back to the runtime
103 ///
104 /// See [`HttpConfig::copy_loops_per_yield`]
105 #[field(with, get, set)]
106 copy_loops_per_yield: usize,
107
108 /// Maximum size to pre-allocate based on content-length for buffering this received body
109 ///
110 /// See [`HttpConfig::received_body_max_preallocate`]
111 #[field(with, get, set)]
112 max_preallocate: usize,
113
114 max_header_list_size: u64,
115
116 trailers: MutCow<'conn, Option<Headers>>,
117
118 /// Byte offset into `b"HTTP/1.1 100 Continue\r\n\r\n"` that remains to be written before the
119 /// first read. `None` means no pending write.
120 send_100_continue_offset: Option<usize>,
121
122 /// The protocol session this body belongs to. Used by the body state machine's `End`
123 /// transition to pull driver-decoded trailers into
124 /// [`Conn::request_trailers`][crate::Conn]: h2 trailers come synchronously off
125 /// [`H2Connection::take_trailers`][H2Connection::take_trailers], h3 trailers come back
126 /// asynchronously via `h3_trailer_future`.
127 protocol_session: ProtocolSession,
128
129 /// a boxed future that handles decoding h3 trailers
130 h3_trailer_future:
131 Option<Pin<Box<dyn Future<Output = io::Result<Headers>> + Send + Sync + 'static>>>,
132}
133
134fn slice_from(min: u64, buf: &[u8]) -> Option<&[u8]> {
135 buf.get(usize::try_from(min).unwrap_or(usize::MAX)..)
136 .filter(|buf| !buf.is_empty())
137}
138
139impl<'conn, Transport> ReceivedBody<'conn, Transport>
140where
141 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
142{
143 #[allow(missing_docs)]
144 #[doc(hidden)]
145 pub fn new(
146 content_length: Option<u64>,
147 buffer: impl Into<MutCow<'conn, Buffer>>,
148 transport: impl Into<MutCow<'conn, Transport>>,
149 state: impl Into<MutCow<'conn, ReceivedBodyState>>,
150 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
151 encoding: &'static Encoding,
152 ) -> Self {
153 Self::new_with_config(
154 content_length,
155 buffer,
156 transport,
157 state,
158 on_completion,
159 encoding,
160 &HttpConfig::DEFAULT,
161 )
162 }
163
164 #[allow(missing_docs)]
165 #[doc(hidden)]
166 pub(crate) fn new_with_config(
167 content_length: Option<u64>,
168 buffer: impl Into<MutCow<'conn, Buffer>>,
169 transport: impl Into<MutCow<'conn, Transport>>,
170 state: impl Into<MutCow<'conn, ReceivedBodyState>>,
171 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
172 encoding: &'static Encoding,
173 config: &HttpConfig,
174 ) -> Self {
175 Self {
176 content_length,
177 buffer: buffer.into(),
178 transport: Some(transport.into()),
179 state: state.into(),
180 on_completion,
181 encoding,
182 max_len: config.received_body_max_len,
183 initial_len: config.received_body_initial_len,
184 copy_loops_per_yield: config.copy_loops_per_yield,
185 max_preallocate: config.received_body_max_preallocate,
186 max_header_list_size: config.max_header_list_size,
187 trailers: None.into(),
188 send_100_continue_offset: None,
189 protocol_session: ProtocolSession::Http1,
190 h3_trailer_future: None,
191 }
192 }
193
194 /// Sets the destination for trailers decoded from the request body.
195 ///
196 /// When the body is fully read, any trailers will be written to the provided storage.
197 #[doc(hidden)]
198 #[must_use]
199 pub fn with_trailers(mut self, trailers: impl Into<MutCow<'conn, Option<Headers>>>) -> Self {
200 self.trailers = trailers.into();
201 self
202 }
203
204 /// Associate this body with the [`ProtocolSession`] that produced it. The End
205 /// transition of the body state machine consults this to pull driver-decoded
206 /// trailers into [`Conn::request_trailers`][crate::Conn] (h2 synchronously,
207 /// h3 via a boxed future). For h1 bodies the session is
208 /// [`ProtocolSession::Http1`] and no trailer-driver hook fires.
209 #[doc(hidden)]
210 #[must_use]
211 #[cfg(feature = "unstable")]
212 pub fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
213 self.protocol_session = protocol_session;
214 self
215 }
216
217 #[doc(hidden)]
218 #[must_use]
219 #[cfg(not(feature = "unstable"))]
220 pub(crate) fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
221 self.protocol_session = protocol_session;
222 self
223 }
224
225 /// Arranges for `HTTP/1.1 100 Continue\r\n\r\n` to be written to the transport before the
226 /// first body read. Used to implement lazy 100-continue for HTTP/1.1 request bodies.
227 #[must_use]
228 pub(crate) fn with_send_100_continue(mut self) -> Self {
229 self.send_100_continue_offset = Some(0);
230 self
231 }
232
233 // pub fn content_length(&self) -> Option<u64> {
234 // self.content_length
235 // }
236
237 /// # Reads entire body to String.
238 ///
239 /// This uses the encoding determined by the content-type (mime)
240 /// charset. If an encoding problem is encountered, the String
241 /// returned by [`ReceivedBody::read_string`] will contain utf8
242 /// replacement characters.
243 ///
244 /// Note that this can only be performed once per Conn, as the
245 /// underlying data is not cached anywhere. This is the only copy of
246 /// the body contents.
247 ///
248 /// # Errors
249 ///
250 /// This will return an error if there is an IO error on the
251 /// underlying transport such as a disconnect
252 ///
253 /// This will also return an error if the length exceeds the maximum length. To override this
254 /// value on this specific body, use [`ReceivedBody::with_max_len`] or
255 /// [`ReceivedBody::set_max_len`]
256 pub async fn read_string(self) -> crate::Result<String> {
257 let encoding = self.encoding();
258 let bytes = self.read_bytes().await?;
259 let (s, _, _) = encoding.decode(&bytes);
260 Ok(s.to_string())
261 }
262
263 fn owns_transport(&self) -> bool {
264 self.transport.as_ref().is_some_and(MutCow::is_owned)
265 }
266
267 /// Similar to [`ReceivedBody::read_string`], but returns the raw bytes. This is useful for
268 /// bodies that are not text.
269 ///
270 /// You can use this in conjunction with `encoding` if you need different handling of malformed
271 /// character encoding than the lossy conversion provided by [`ReceivedBody::read_string`].
272 ///
273 /// # Errors
274 ///
275 /// This will return an error if there is an IO error on the underlying transport such as a
276 /// disconnect
277 ///
278 /// This will also return an error if the length exceeds
279 /// [`received_body_max_len`][HttpConfig::with_received_body_max_len]. To override this value on
280 /// this specific body, use [`ReceivedBody::with_max_len`] or [`ReceivedBody::set_max_len`]
281 pub async fn read_bytes(mut self) -> crate::Result<Vec<u8>> {
282 let mut vec = if let Some(len) = self.content_length {
283 if len > self.max_len {
284 return Err(Error::ReceivedBodyTooLong(self.max_len));
285 }
286
287 let len = usize::try_from(len).map_err(|_| Error::ReceivedBodyTooLong(self.max_len))?;
288
289 Vec::with_capacity(len.min(self.max_preallocate))
290 } else {
291 Vec::with_capacity(self.initial_len)
292 };
293
294 self.read_to_end(&mut vec).await?;
295 Ok(vec)
296 }
297
298 // pub fn encoding(&self) -> &'static Encoding {
299 // self.encoding
300 // }
301
302 fn read_raw(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
303 if let Some(transport) = self.transport.as_deref_mut() {
304 read_buffered(&mut self.buffer, transport, cx, buf)
305 } else {
306 Ready(Err(ErrorKind::NotConnected.into()))
307 }
308 }
309
310 /// Consumes the remainder of this body from the underlying transport by reading it to the end
311 /// and discarding the contents. This is important for http1.1 keepalive, but most of the
312 /// time you do not need to directly call this. It returns the number of bytes consumed.
313 ///
314 /// # Errors
315 ///
316 /// This will return an [`std::io::Result::Err`] if there is an io error on the underlying
317 /// transport, such as a disconnect
318 #[allow(clippy::missing_errors_doc)] // false positive
319 pub async fn drain(self) -> io::Result<u64> {
320 let copy_loops_per_yield = self.copy_loops_per_yield;
321 copy(self, futures_lite::io::sink(), copy_loops_per_yield).await
322 }
323}
324
325impl<T> ReceivedBody<'static, T> {
326 /// takes the static transport from this received body
327 pub fn take_transport(&mut self) -> Option<T> {
328 self.transport.take().map(MutCow::unwrap_owned)
329 }
330}
331
332pub(crate) fn read_buffered<Transport>(
333 buffer: &mut Buffer,
334 transport: &mut Transport,
335 cx: &mut Context<'_>,
336 buf: &mut [u8],
337) -> Poll<io::Result<usize>>
338where
339 Transport: AsyncRead + Unpin,
340{
341 if buffer.is_empty() {
342 Pin::new(transport).poll_read(cx, buf)
343 } else if buffer.len() >= buf.len() {
344 let len = buf.len();
345 buf.copy_from_slice(&buffer[..len]);
346 buffer.ignore_front(len);
347 Ready(Ok(len))
348 } else {
349 let self_buffer_len = buffer.len();
350 buf[..self_buffer_len].copy_from_slice(buffer);
351 buffer.truncate(0);
352 match Pin::new(transport).poll_read(cx, &mut buf[self_buffer_len..]) {
353 Ready(Ok(additional)) => Ready(Ok(additional + self_buffer_len)),
354 Pending => Ready(Ok(self_buffer_len)),
355 other @ Ready(_) => other,
356 }
357 }
358}
359
360type StateOutput = Poll<io::Result<(ReceivedBodyState, usize)>>;
361
362impl<Transport> ReceivedBody<'_, Transport>
363where
364 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
365{
366 #[inline]
367 fn handle_start(&mut self) -> StateOutput {
368 Ready(Ok((
369 match self.content_length {
370 Some(0) => End,
371
372 Some(total_length) if total_length <= self.max_len => FixedLength {
373 current_index: 0,
374 total: total_length,
375 },
376
377 Some(_) => {
378 return Ready(Err(io::Error::new(
379 ErrorKind::Unsupported,
380 "content too long",
381 )));
382 }
383
384 None => Chunked {
385 remaining: 0,
386 total: 0,
387 },
388 },
389 0,
390 )))
391 }
392}
393
394impl<Transport> AsyncRead for ReceivedBody<'_, Transport>
395where
396 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
397{
398 fn poll_read(
399 mut self: Pin<&mut Self>,
400 cx: &mut Context<'_>,
401 buf: &mut [u8],
402 ) -> Poll<io::Result<usize>> {
403 const CONTINUE: &[u8] = b"HTTP/1.1 100 Continue\r\n\r\n";
404 while let Some(offset) = self.send_100_continue_offset {
405 let n = {
406 let Some(transport) = self.transport.as_deref_mut() else {
407 return Ready(Err(ErrorKind::NotConnected.into()));
408 };
409 if offset == 0 {
410 log::trace!("sending 100-continue");
411 }
412 ready!(Pin::new(transport).poll_write(cx, &CONTINUE[offset..]))?
413 };
414 if n == 0 {
415 return Ready(Err(ErrorKind::WriteZero.into()));
416 }
417 let new_offset = offset + n;
418 self.send_100_continue_offset = if new_offset >= CONTINUE.len() {
419 None
420 } else {
421 Some(new_offset)
422 };
423 }
424
425 for _ in 0..self.copy_loops_per_yield {
426 let (new_body_state, bytes) = ready!(match *self.state {
427 Start => self.handle_start(),
428 Chunked { remaining, total } => self.handle_chunked(cx, buf, remaining, total),
429 PartialChunkSize { total } => self.handle_partial(cx, buf, total),
430 FixedLength {
431 current_index,
432 total,
433 } => self.handle_fixed_length(cx, buf, current_index, total),
434 ReceivedBodyState::H2Data { total } => self.handle_h2_data(cx, buf, total),
435 ReceivedBodyState::H3Data {
436 remaining_in_frame,
437 total,
438 frame_type,
439 partial_frame_header,
440 } => self.handle_h3_data(
441 cx,
442 buf,
443 remaining_in_frame,
444 total,
445 frame_type,
446 partial_frame_header,
447 ),
448 ReceivedBodyState::ReadingH1Trailers { total } => {
449 self.handle_reading_h1_trailers(cx, buf, total)
450 }
451 End => Ready(Ok((End, 0))),
452 })?;
453
454 *self.state = new_body_state;
455
456 if *self.state == End {
457 if bytes == 0
458 && let Some(h3_trailer_future) = &mut self.h3_trailer_future
459 {
460 let trailers = ready!(h3_trailer_future.as_mut().poll(cx))?;
461 *self.trailers = Some(trailers);
462 self.h3_trailer_future = None;
463 }
464
465 // h2 trailer handoff. The driver decodes trailers on a separate task and
466 // stashes them on the per-stream `StreamState` *before* signalling EOF, so
467 // by the time we reach `End` the trailers (if any) are present — no boxed
468 // future required. Replacing the session with `Http1` after the drain is the
469 // idempotency mechanism: subsequent `End` re-entries see no h2 session.
470 if bytes == 0
471 && let Some((h2_connection, stream_id)) =
472 std::mem::replace(&mut self.protocol_session, ProtocolSession::Http1)
473 .as_h2()
474 && let Some(trailers) = h2_connection.take_trailers(stream_id)
475 {
476 *self.trailers = Some(trailers);
477 }
478
479 if self.on_completion.is_some() && self.owns_transport() {
480 let transport = self.transport.take().unwrap().unwrap_owned();
481 let on_completion = self.on_completion.take().unwrap();
482 on_completion(transport);
483 }
484 return Ready(Ok(bytes));
485 } else if bytes != 0 {
486 return Ready(Ok(bytes));
487 }
488 }
489
490 cx.waker().wake_by_ref();
491 Pending
492 }
493}
494
495impl<Transport> Debug for ReceivedBody<'_, Transport> {
496 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
497 f.debug_struct("ReceivedBody")
498 .field("state", &*self.state)
499 .field("content_length", &self.content_length)
500 .field("buffer", &format_args!(".."))
501 .field("on_completion", &self.on_completion.is_some())
502 .finish()
503 }
504}
505
506/// The type of H3 frame currently being processed in [`ReceivedBodyState::H3Data`].
507#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
508#[allow(missing_docs)]
509#[doc(hidden)]
510pub enum H3BodyFrameType {
511 /// Initial state — no frame decoded yet.
512 #[default]
513 Start,
514 /// Inside a DATA frame — body bytes to keep.
515 Data,
516 /// Inside an unknown frame — payload bytes to discard.
517 Unknown,
518 /// Inside a trailing HEADERS frame — accumulate into buffer for parsing.
519 Trailers,
520}
521
522#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
523#[allow(missing_docs)]
524#[doc(hidden)]
525pub enum ReceivedBodyState {
526 /// initial state
527 #[default]
528 Start,
529
530 /// read state for a chunked-encoded body. the number of bytes that have been read from the
531 /// current chunk is the difference between remaining and total.
532 Chunked {
533 /// remaining indicates the bytes left _in the current
534 /// chunk_. initial state is zero.
535 remaining: u64,
536
537 /// total indicates the absolute number of bytes read from all chunks
538 total: u64,
539 },
540
541 /// read state when we have buffered content between subsequent polls because chunk framing
542 /// overlapped a buffer boundary
543 PartialChunkSize { total: u64 },
544
545 /// read state for a fixed-length body.
546 FixedLength {
547 /// current index represents the bytes that have already been
548 /// read. initial state is zero
549 current_index: u64,
550
551 /// total length indicates the claimed length, usually
552 /// determined by the content-length header
553 total: u64,
554 },
555
556 /// read state for an H2 body. The h2 driver demuxes DATA frames into a per-stream recv
557 /// ring on a separate task before we see them, so there's no frame-boundary state here —
558 /// just a running byte total for `max_len` / content-length enforcement. Transitions to
559 /// [`End`] when the transport yields `Ready(0)` (the driver's signal that `END_STREAM`
560 /// was observed). Initial state for any body on an h2 request.
561 H2Data {
562 /// total body bytes read across all DATA frames.
563 total: u64,
564 },
565
566 /// read state for an H3 body framed as DATA frames.
567 H3Data {
568 /// bytes remaining in the current frame (DATA, Unknown, or Trailers). zero means we need
569 /// to read the next frame header.
570 remaining_in_frame: u64,
571
572 /// total body bytes read across all DATA frames.
573 total: u64,
574
575 /// what kind of frame we're currently inside.
576 frame_type: H3BodyFrameType,
577
578 /// when true, a partial frame header is sitting in `self.buffer` and needs more bytes
579 /// before we can decode it.
580 partial_frame_header: bool,
581 },
582
583 /// accumulating the HTTP/1.1 chunked trailer-section after the last-chunk (`0\r\n`).
584 ///
585 /// The trailer bytes (including any partially-received trailer headers) live in
586 /// `ReceivedBody::buffer` until a final empty line (`\r\n\r\n` or bare `\r\n`) is found.
587 ReadingH1Trailers {
588 /// total body bytes read across all chunks (for bounds-checking)
589 total: u64,
590 },
591
592 /// the terminal read state
593 End,
594}
595
596impl ReceivedBodyState {
597 pub fn new_h2() -> Self {
598 Self::H2Data { total: 0 }
599 }
600
601 pub fn new_h3() -> Self {
602 Self::H3Data {
603 remaining_in_frame: 0,
604 total: 0,
605 frame_type: H3BodyFrameType::Start,
606 partial_frame_header: false,
607 }
608 }
609}
610
611impl<Transport> From<ReceivedBody<'static, Transport>> for Body
612where
613 Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
614{
615 fn from(rb: ReceivedBody<'static, Transport>) -> Self {
616 let len = rb.content_length;
617 Body::new_streaming(rb, len)
618 }
619}