trillium_http/body.rs
1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Trait for streaming body sources that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed.
19///
20/// For plain [`AsyncRead`] sources with no trailers, use [`Body::new_streaming`].
21/// `BodySource` is only needed when trailers must be produced.
22pub trait BodySource: AsyncRead + Send + 'static {
23 /// Returns the trailers for this body, called after the body has been fully read.
24 ///
25 /// Implementations may clear internal state on this call; the result is
26 /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
27 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
28}
29
30pin_project! {
31 struct PlainBody<T> {
32 #[pin]
33 async_read: T,
34 }
35}
36
37impl<T: AsyncRead> AsyncRead for PlainBody<T> {
38 fn poll_read(
39 self: Pin<&mut Self>,
40 cx: &mut Context<'_>,
41 buf: &mut [u8],
42 ) -> Poll<Result<usize>> {
43 self.project().async_read.poll_read(cx, buf)
44 }
45}
46
47impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
48 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
49 None
50 }
51}
52
53/// The trillium representation of a http body. This can contain
54/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
55/// [`AsyncRead`]/[`BodySource`] type.
56#[derive(Debug, Default)]
57pub struct Body(pub(crate) BodyType);
58
59impl Body {
60 /// Construct a new body from a streaming [`AsyncRead`] source. If
61 /// you have the body content in memory already, prefer
62 /// [`Body::new_static`] or one of the From conversions.
63 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
64 Self::new_with_trailers(PlainBody { async_read }, len)
65 }
66
67 /// Construct a new body from a [`BodySource`] that can produce trailers after
68 /// the body has been fully read.
69 ///
70 /// Use this when trailers must be computed dynamically from the body bytes,
71 /// for example to append a content hash.
72 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
73 Self(Streaming {
74 async_read: SyncWrapper::new(Box::pin(body)),
75 len,
76 done: false,
77 progress: 0,
78 chunked_framing: true,
79 })
80 }
81
82 /// Disable RFC 9112 chunked-encoding framing emitted by [`AsyncRead`] for streaming
83 /// bodies of unknown length.
84 ///
85 /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
86 /// implementation emits the wire-format chunked framing (chunk-size prefix, terminating
87 /// `0\r\n` marker) so the h1 codec can write its bytes directly. That framing is wrong
88 /// for any consumer that wants raw body bytes — e.g., installing a Body as the override
89 /// source on a client `Conn` for cache replay or middleware tee.
90 ///
91 /// This method is `#[doc(hidden)]` and `unstable`-feature-gated; it exists for internal
92 /// use by trillium-client. External code has no reason to set this flag.
93 #[doc(hidden)]
94 #[cfg(feature = "unstable")]
95 #[must_use]
96 pub fn without_chunked_framing(mut self) -> Self {
97 if let Streaming {
98 ref mut chunked_framing,
99 ..
100 } = self.0
101 {
102 *chunked_framing = false;
103 }
104 self
105 }
106
107 pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
108 if let Streaming {
109 ref mut chunked_framing,
110 ..
111 } = self.0
112 {
113 *chunked_framing = true;
114 }
115
116 self
117 }
118
119 /// Returns trailers from the body source, if any.
120 ///
121 /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
122 /// has returned `Ok(0)`). Returns `None` for bodies constructed with
123 /// [`Body::new_streaming`] or [`Body::new_static`].
124 #[doc(hidden)] // this isn't really a user-facing interface
125 pub fn trailers(&mut self) -> Option<Headers> {
126 match &mut self.0 {
127 Streaming {
128 async_read, done, ..
129 } if *done => async_read.get_mut().as_mut().trailers(),
130 _ => None,
131 }
132 }
133
134 /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
135 /// [u8]`.
136 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
137 Self(Static {
138 content: content.into(),
139 cursor: 0,
140 })
141 }
142
143 /// Retrieve a borrow of the static content in this body. If this
144 /// body is a streaming body or an empty body, this will return
145 /// None.
146 pub fn static_bytes(&self) -> Option<&[u8]> {
147 match &self.0 {
148 Static { content, .. } => Some(content.as_ref()),
149 _ => None,
150 }
151 }
152
153 /// Transform this Body into a dyn [`AsyncRead`]. This will wrap
154 /// static content in a [`Cursor`]. Note that this is different
155 /// from reading directly from the Body, which includes chunked
156 /// encoding.
157 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
158 match self.0 {
159 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
160 Static { content, .. } => Box::pin(Cursor::new(content)),
161 Empty => Box::pin(Cursor::new("")),
162 }
163 }
164
165 /// Consume this body and return the full content. If the body was
166 /// constructed with [`Body::new_streaming`], this will read the
167 /// entire streaming body into memory, awaiting the streaming
168 /// source's completion. This function will return an error if a
169 /// streaming body has already been partially or fully read.
170 ///
171 /// # Errors
172 ///
173 /// This returns an error variant if either of the following conditions are met:
174 ///
175 /// there is an io error when reading from the underlying transport such as a disconnect
176 /// the body has already been read to completion
177 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
178 match self.0 {
179 Static { content, .. } => Ok(content),
180
181 Streaming {
182 async_read,
183 len,
184 progress: 0,
185 done: false,
186 ..
187 } => {
188 let mut async_read = async_read.into_inner();
189 let mut buf = len
190 .and_then(|c| c.try_into().ok())
191 .map(Vec::with_capacity)
192 .unwrap_or_default();
193
194 async_read.read_to_end(&mut buf).await?;
195
196 Ok(Cow::Owned(buf))
197 }
198
199 Empty => Ok(Cow::Borrowed(b"")),
200
201 Streaming { .. } => Err(Error::other("body already read to completion")),
202 }
203 }
204
205 /// Retrieve the number of bytes that have been read from this
206 /// body
207 pub fn bytes_read(&self) -> u64 {
208 self.0.bytes_read()
209 }
210
211 /// returns the content length of this body, if known and
212 /// available.
213 pub fn len(&self) -> Option<u64> {
214 self.0.len()
215 }
216
217 /// determine if the this body represents no data
218 pub fn is_empty(&self) -> bool {
219 self.0.is_empty()
220 }
221
222 /// determine if the this body represents static content
223 pub fn is_static(&self) -> bool {
224 matches!(self.0, Static { .. })
225 }
226
227 /// determine if the this body represents streaming content
228 pub fn is_streaming(&self) -> bool {
229 matches!(self.0, Streaming { .. })
230 }
231
232 /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
233 ///
234 /// Static bodies (constructed via [`Body::new_static`] or any `From` conversion for
235 /// `Vec<u8>`, `&'static [u8]`, `String`, `&'static str`, etc.) clone cheaply — just a
236 /// `Cow` clone, which is a pointer copy for borrowed `&'static` content and a `Vec` clone
237 /// for owned content. The clone resets read progress, so it can be sent again from the
238 /// beginning.
239 ///
240 /// Empty bodies always clone successfully.
241 ///
242 /// This is useful for client middleware that needs to retransmit a body — e.g., redirect
243 /// handlers, retry handlers, or auth-refresh handlers.
244 #[doc(hidden)]
245 #[cfg(feature = "unstable")]
246 pub fn try_clone(&self) -> Option<Self> {
247 match &self.0 {
248 Empty => Some(Self::default()),
249 Static { content, .. } => Some(Self(Static {
250 content: content.clone(),
251 cursor: 0,
252 })),
253 Streaming { .. } => None,
254 }
255 }
256
257 /// Convert this body into an `H3Body` for reading
258 #[cfg(feature = "unstable")]
259 pub fn into_h3(self) -> H3Body {
260 H3Body::new(self)
261 }
262
263 /// Convert this body into an `H3Body` for reading
264 #[cfg(not(feature = "unstable"))]
265 pub(crate) fn into_h3(self) -> H3Body {
266 H3Body::new(self)
267 }
268
269 /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
270 ///
271 /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
272 /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
273 /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
274 /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
275 pub(crate) fn into_h2(self) -> H2Body {
276 H2Body::new(self)
277 }
278}
279
280#[allow(
281 clippy::cast_sign_loss,
282 clippy::cast_possible_truncation,
283 clippy::cast_precision_loss
284)]
285fn max_bytes_to_read(buf_len: usize) -> usize {
286 assert!(
287 buf_len >= 6,
288 "buffers of length {buf_len} are too small for this implementation.
289 if this is a problem for you, please open an issue"
290 );
291
292 // #[allow(clippy::cast_precision_loss)] applied to the function
293 // is for this line. We do not expect our buffers to be on the
294 // order of petabytes, so we will not fall outside of the range of
295 // integers that can be represented by f64
296 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
297
298 // #[allow(clippy::cast_sign_loss)] applied to the function is for
299 // this line. This is ok because we know buf_len is already a
300 // usize and we are just converting it to an f64 in order to do
301 // float log2(x)/4
302 //
303 // the maximum number of bytes that the hex representation of remaining bytes might take
304 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
305
306 // #[allow(clippy::cast_sign_loss)] applied to the function is for
307 // this line. This is ok because max_bytes_of_hex_framing will
308 // always be smaller than bytes_remaining_after_two_cr_lns, and so
309 // there is no risk of sign loss
310 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
311}
312
313impl AsyncRead for Body {
314 fn poll_read(
315 mut self: Pin<&mut Self>,
316 cx: &mut Context<'_>,
317 buf: &mut [u8],
318 ) -> Poll<Result<usize>> {
319 match &mut self.0 {
320 Empty => Poll::Ready(Ok(0)),
321 Static { content, cursor } => {
322 let length = content.len();
323 if length == *cursor {
324 return Poll::Ready(Ok(0));
325 }
326 let bytes = (length - *cursor).min(buf.len());
327 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
328 *cursor += bytes;
329 Poll::Ready(Ok(bytes))
330 }
331
332 Streaming {
333 async_read,
334 len: Some(len),
335 done,
336 progress,
337 ..
338 } => {
339 if *done {
340 return Poll::Ready(Ok(0));
341 }
342
343 let max_bytes_to_read = (*len - *progress)
344 .try_into()
345 .unwrap_or(buf.len())
346 .min(buf.len());
347
348 let bytes = ready!(
349 async_read
350 .get_mut()
351 .as_mut()
352 .poll_read(cx, &mut buf[..max_bytes_to_read])
353 )?;
354
355 if bytes == 0 {
356 *done = true;
357 } else {
358 *progress += bytes as u64;
359 }
360
361 Poll::Ready(Ok(bytes))
362 }
363
364 Streaming {
365 async_read,
366 len: None,
367 done,
368 progress,
369 chunked_framing,
370 } => {
371 if *done {
372 return Poll::Ready(Ok(0));
373 }
374
375 if !*chunked_framing {
376 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
377 if bytes == 0 {
378 *done = true;
379 } else {
380 *progress += bytes as u64;
381 }
382 return Poll::Ready(Ok(bytes));
383 }
384
385 let max_bytes_to_read = max_bytes_to_read(buf.len());
386
387 let bytes = ready!(
388 async_read
389 .get_mut()
390 .as_mut()
391 .poll_read(cx, &mut buf[..max_bytes_to_read])
392 )?;
393
394 if bytes == 0 {
395 *done = true;
396 // Write only the last-chunk marker (`0\r\n`). The caller must then
397 // emit the trailer-section (possibly empty) followed by the
398 // terminating `\r\n` to complete RFC 9112 §7.1.2 chunked framing.
399 //
400 // This split is structural, not a missed opportunity to encapsulate:
401 // * Trailers come from `BodySource::trailers() -> Option<Headers>` after EOF,
402 // not from this `AsyncRead` path. They are structured `Headers` data, not
403 // bytes.
404 // * Formatting them needs `HttpContext` config (e.g.
405 // `panic_on_invalid_response_headers`) that `Body` does not carry, and
406 // reuses the same `write_headers_or_trailers` helper used for the
407 // response-header section.
408 // * Trailers can be arbitrarily large; emitting them from inside `poll_read`
409 // would force a multi-poll state machine to span buffers. The caller writes
410 // them in one shot via `BufWriter::buffer_mut()`, which has no such
411 // constraint.
412 //
413 // Caller stitch lives in `conn/h1.rs::Conn::send` after the
414 // `bufwriter.copy_from(&mut body, ...)` drain.
415 buf[..3].copy_from_slice(b"0\r\n");
416 return Poll::Ready(Ok(3));
417 }
418
419 *progress += bytes as u64;
420
421 let start = format!("{bytes:X}\r\n");
422 let start_length = start.len();
423 let total = bytes + start_length + 2;
424 buf.copy_within(..bytes, start_length);
425 buf[..start_length].copy_from_slice(start.as_bytes());
426 buf[total - 2..total].copy_from_slice(b"\r\n");
427 Poll::Ready(Ok(total))
428 }
429 }
430 }
431}
432
433struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
434impl Debug for SyncAsyncReader {
435 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
436 f.debug_struct("SyncAsyncReader").finish()
437 }
438}
439impl AsyncRead for SyncAsyncReader {
440 fn poll_read(
441 self: Pin<&mut Self>,
442 cx: &mut Context<'_>,
443 buf: &mut [u8],
444 ) -> Poll<Result<usize>> {
445 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
446 }
447}
448
449#[derive(Default)]
450pub(crate) enum BodyType {
451 #[default]
452 Empty,
453
454 Static {
455 content: Cow<'static, [u8]>,
456 cursor: usize,
457 },
458
459 Streaming {
460 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
461 progress: u64,
462 len: Option<u64>,
463 done: bool,
464 /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits RFC 9112
465 /// chunked-encoding framing for the `len: None` case so the h1 codec can
466 /// write the bytes directly. When false (set via
467 /// [`Body::without_chunked_framing`]), the same path passes through raw
468 /// bytes from the inner [`BodySource`].
469 chunked_framing: bool,
470 },
471}
472
473impl Debug for BodyType {
474 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
475 match self {
476 Empty => f.debug_tuple("BodyType::Empty").finish(),
477 Static { content, cursor } => f
478 .debug_struct("BodyType::Static")
479 .field("content", &String::from_utf8_lossy(content))
480 .field("cursor", cursor)
481 .finish(),
482 Streaming {
483 len,
484 done,
485 progress,
486 ..
487 } => f
488 .debug_struct("BodyType::Streaming")
489 .field("async_read", &format_args!(".."))
490 .field("len", &len)
491 .field("done", &done)
492 .field("progress", &progress)
493 .finish(),
494 }
495 }
496}
497
498impl BodyType {
499 fn is_empty(&self) -> bool {
500 match *self {
501 Empty => true,
502 Static { ref content, .. } => content.is_empty(),
503 Streaming { len, .. } => len == Some(0),
504 }
505 }
506
507 fn len(&self) -> Option<u64> {
508 match *self {
509 Empty => Some(0),
510 Static { ref content, .. } => Some(content.len() as u64),
511 Streaming { len, .. } => len,
512 }
513 }
514
515 fn bytes_read(&self) -> u64 {
516 match *self {
517 Empty => 0,
518 Static { cursor, .. } => cursor as u64,
519 Streaming { progress, .. } => progress,
520 }
521 }
522}
523
524impl From<String> for Body {
525 fn from(s: String) -> Self {
526 s.into_bytes().into()
527 }
528}
529
530impl From<&'static str> for Body {
531 fn from(s: &'static str) -> Self {
532 s.as_bytes().into()
533 }
534}
535
536impl From<&'static [u8]> for Body {
537 fn from(content: &'static [u8]) -> Self {
538 Self::new_static(content)
539 }
540}
541
542impl From<Vec<u8>> for Body {
543 fn from(content: Vec<u8>) -> Self {
544 Self::new_static(content)
545 }
546}
547
548impl From<Cow<'static, [u8]>> for Body {
549 fn from(value: Cow<'static, [u8]>) -> Self {
550 Self::new_static(value)
551 }
552}
553
554impl From<Cow<'static, str>> for Body {
555 fn from(value: Cow<'static, str>) -> Self {
556 match value {
557 Cow::Borrowed(b) => b.into(),
558 Cow::Owned(o) => o.into(),
559 }
560 }
561}
562
563#[cfg(test)]
564mod test_bytes_to_read {
565 #[test]
566 fn simple_check_of_known_values() {
567 // the marked rows are the most important part of this test,
568 // and a nonobvious but intentional consequence of the
569 // implementation. in order to avoid overflowing, we must use
570 // one fewer than the available buffer bytes because
571 // increasing the read size increase the number of framed
572 // bytes by two. This occurs when the hex representation of
573 // the content bytes is near an increase in order of magnitude
574 // (F->10, FF->100, FFF-> 1000, etc)
575 let values = vec![
576 (6, 1), // 1
577 (7, 2), // 2
578 (20, 15), // F
579 (21, 15), // F <-
580 (22, 16), // 10
581 (23, 17), // 11
582 (260, 254), // FE
583 (261, 254), // FE <-
584 (262, 255), // FF <-
585 (263, 256), // 100
586 (4100, 4093), // FFD
587 (4101, 4093), // FFD <-
588 (4102, 4094), // FFE <-
589 (4103, 4095), // FFF <-
590 (4104, 4096), // 1000
591 ];
592
593 for (input, expected) in values {
594 let actual = super::max_bytes_to_read(input);
595 assert_eq!(
596 actual, expected,
597 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
598 );
599
600 // testing the test:
601 let used_bytes = expected + 4 + format!("{expected:X}").len();
602 assert!(
603 used_bytes == input || used_bytes == input - 1,
604 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
605 input,
606 input,
607 input - 1,
608 used_bytes
609 );
610 }
611 }
612}