trillium_http/body.rs
1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Trait for streaming body sources that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed.
19///
20/// For plain [`AsyncRead`] sources with no trailers, use [`Body::new_streaming`].
21/// `BodySource` is only needed when trailers must be produced.
22pub trait BodySource: AsyncRead + Send + 'static {
23 /// Returns the trailers for this body, called after the body has been fully read.
24 ///
25 /// Implementations may clear internal state on this call; the result is
26 /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
27 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
28}
29
30pin_project! {
31 struct PlainBody<T> {
32 #[pin]
33 async_read: T,
34 }
35}
36
37impl<T: AsyncRead> AsyncRead for PlainBody<T> {
38 fn poll_read(
39 self: Pin<&mut Self>,
40 cx: &mut Context<'_>,
41 buf: &mut [u8],
42 ) -> Poll<Result<usize>> {
43 self.project().async_read.poll_read(cx, buf)
44 }
45}
46
47impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
48 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
49 None
50 }
51}
52
53/// The trillium representation of a http body. This can contain
54/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
55/// [`AsyncRead`]/[`BodySource`] type.
56#[derive(Debug, Default)]
57pub struct Body(pub(crate) BodyType);
58
59impl Body {
60 /// Construct a new body from a streaming [`AsyncRead`] source. If
61 /// you have the body content in memory already, prefer
62 /// [`Body::new_static`] or one of the From conversions.
63 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
64 Self::new_with_trailers(PlainBody { async_read }, len)
65 }
66
67 /// Construct a new body from a [`BodySource`] that can produce trailers after
68 /// the body has been fully read.
69 ///
70 /// Use this when trailers must be computed dynamically from the body bytes,
71 /// for example to append a content hash.
72 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
73 Self(Streaming {
74 async_read: SyncWrapper::new(Box::pin(body)),
75 len,
76 done: false,
77 progress: 0,
78 })
79 }
80
81 /// Returns trailers from the body source, if any.
82 ///
83 /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
84 /// has returned `Ok(0)`). Returns `None` for bodies constructed with
85 /// [`Body::new_streaming`] or [`Body::new_static`].
86 #[doc(hidden)] // this isn't really a user-facing interface
87 pub fn trailers(&mut self) -> Option<Headers> {
88 match &mut self.0 {
89 Streaming {
90 async_read, done, ..
91 } if *done => async_read.get_mut().as_mut().trailers(),
92 _ => None,
93 }
94 }
95
96 /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
97 /// [u8]`.
98 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
99 Self(Static {
100 content: content.into(),
101 cursor: 0,
102 })
103 }
104
105 /// Retrieve a borrow of the static content in this body. If this
106 /// body is a streaming body or an empty body, this will return
107 /// None.
108 pub fn static_bytes(&self) -> Option<&[u8]> {
109 match &self.0 {
110 Static { content, .. } => Some(content.as_ref()),
111 _ => None,
112 }
113 }
114
115 /// Transform this Body into a dyn [`AsyncRead`]. This will wrap
116 /// static content in a [`Cursor`]. Note that this is different
117 /// from reading directly from the Body, which includes chunked
118 /// encoding.
119 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
120 match self.0 {
121 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
122 Static { content, .. } => Box::pin(Cursor::new(content)),
123 Empty => Box::pin(Cursor::new("")),
124 }
125 }
126
127 /// Consume this body and return the full content. If the body was
128 /// constructed with [`Body::new_streaming`], this will read the
129 /// entire streaming body into memory, awaiting the streaming
130 /// source's completion. This function will return an error if a
131 /// streaming body has already been partially or fully read.
132 ///
133 /// # Errors
134 ///
135 /// This returns an error variant if either of the following conditions are met:
136 ///
137 /// there is an io error when reading from the underlying transport such as a disconnect
138 /// the body has already been read to completion
139 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
140 match self.0 {
141 Static { content, .. } => Ok(content),
142
143 Streaming {
144 async_read,
145 len,
146 progress: 0,
147 done: false,
148 } => {
149 let mut async_read = async_read.into_inner();
150 let mut buf = len
151 .and_then(|c| c.try_into().ok())
152 .map(Vec::with_capacity)
153 .unwrap_or_default();
154
155 async_read.read_to_end(&mut buf).await?;
156
157 Ok(Cow::Owned(buf))
158 }
159
160 Empty => Ok(Cow::Borrowed(b"")),
161
162 Streaming { .. } => Err(Error::other("body already read to completion")),
163 }
164 }
165
166 /// Retrieve the number of bytes that have been read from this
167 /// body
168 pub fn bytes_read(&self) -> u64 {
169 self.0.bytes_read()
170 }
171
172 /// returns the content length of this body, if known and
173 /// available.
174 pub fn len(&self) -> Option<u64> {
175 self.0.len()
176 }
177
178 /// determine if the this body represents no data
179 pub fn is_empty(&self) -> bool {
180 self.0.is_empty()
181 }
182
183 /// determine if the this body represents static content
184 pub fn is_static(&self) -> bool {
185 matches!(self.0, Static { .. })
186 }
187
188 /// determine if the this body represents streaming content
189 pub fn is_streaming(&self) -> bool {
190 matches!(self.0, Streaming { .. })
191 }
192
193 /// Convert this body into an `H3Body` for reading
194 #[cfg(feature = "unstable")]
195 pub fn into_h3(self) -> H3Body {
196 H3Body::new(self)
197 }
198
199 /// Convert this body into an `H3Body` for reading
200 #[cfg(not(feature = "unstable"))]
201 pub(crate) fn into_h3(self) -> H3Body {
202 H3Body::new(self)
203 }
204
205 /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
206 ///
207 /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
208 /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
209 /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
210 /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
211 pub(crate) fn into_h2(self) -> H2Body {
212 H2Body::new(self)
213 }
214}
215
216#[allow(
217 clippy::cast_sign_loss,
218 clippy::cast_possible_truncation,
219 clippy::cast_precision_loss
220)]
221fn max_bytes_to_read(buf_len: usize) -> usize {
222 assert!(
223 buf_len >= 6,
224 "buffers of length {buf_len} are too small for this implementation.
225 if this is a problem for you, please open an issue"
226 );
227
228 // #[allow(clippy::cast_precision_loss)] applied to the function
229 // is for this line. We do not expect our buffers to be on the
230 // order of petabytes, so we will not fall outside of the range of
231 // integers that can be represented by f64
232 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
233
234 // #[allow(clippy::cast_sign_loss)] applied to the function is for
235 // this line. This is ok because we know buf_len is already a
236 // usize and we are just converting it to an f64 in order to do
237 // float log2(x)/4
238 //
239 // the maximum number of bytes that the hex representation of remaining bytes might take
240 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
241
242 // #[allow(clippy::cast_sign_loss)] applied to the function is for
243 // this line. This is ok because max_bytes_of_hex_framing will
244 // always be smaller than bytes_remaining_after_two_cr_lns, and so
245 // there is no risk of sign loss
246 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
247}
248
249impl AsyncRead for Body {
250 fn poll_read(
251 mut self: Pin<&mut Self>,
252 cx: &mut Context<'_>,
253 buf: &mut [u8],
254 ) -> Poll<Result<usize>> {
255 match &mut self.0 {
256 Empty => Poll::Ready(Ok(0)),
257 Static { content, cursor } => {
258 let length = content.len();
259 if length == *cursor {
260 return Poll::Ready(Ok(0));
261 }
262 let bytes = (length - *cursor).min(buf.len());
263 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
264 *cursor += bytes;
265 Poll::Ready(Ok(bytes))
266 }
267
268 Streaming {
269 async_read,
270 len: Some(len),
271 done,
272 progress,
273 } => {
274 if *done {
275 return Poll::Ready(Ok(0));
276 }
277
278 let max_bytes_to_read = (*len - *progress)
279 .try_into()
280 .unwrap_or(buf.len())
281 .min(buf.len());
282
283 let bytes = ready!(
284 async_read
285 .get_mut()
286 .as_mut()
287 .poll_read(cx, &mut buf[..max_bytes_to_read])
288 )?;
289
290 if bytes == 0 {
291 *done = true;
292 } else {
293 *progress += bytes as u64;
294 }
295
296 Poll::Ready(Ok(bytes))
297 }
298
299 Streaming {
300 async_read,
301 len: None,
302 done,
303 progress,
304 } => {
305 if *done {
306 return Poll::Ready(Ok(0));
307 }
308
309 let max_bytes_to_read = max_bytes_to_read(buf.len());
310
311 let bytes = ready!(
312 async_read
313 .get_mut()
314 .as_mut()
315 .poll_read(cx, &mut buf[..max_bytes_to_read])
316 )?;
317
318 if bytes == 0 {
319 *done = true;
320 // Write only the last-chunk marker (`0\r\n`). The caller must then
321 // emit the trailer-section (possibly empty) followed by the
322 // terminating `\r\n` to complete RFC 9112 §7.1.2 chunked framing.
323 //
324 // This split is structural, not a missed opportunity to encapsulate:
325 // * Trailers come from `BodySource::trailers() -> Option<Headers>` after EOF,
326 // not from this `AsyncRead` path. They are structured `Headers` data, not
327 // bytes.
328 // * Formatting them needs `HttpContext` config (e.g.
329 // `panic_on_invalid_response_headers`) that `Body` does not carry, and
330 // reuses the same `write_headers_or_trailers` helper used for the
331 // response-header section.
332 // * Trailers can be arbitrarily large; emitting them from inside `poll_read`
333 // would force a multi-poll state machine to span buffers. The caller writes
334 // them in one shot via `BufWriter::buffer_mut()`, which has no such
335 // constraint.
336 //
337 // Caller stitch lives in `conn/h1.rs::Conn::send` after the
338 // `bufwriter.copy_from(&mut body, ...)` drain.
339 buf[..3].copy_from_slice(b"0\r\n");
340 return Poll::Ready(Ok(3));
341 }
342
343 *progress += bytes as u64;
344
345 let start = format!("{bytes:X}\r\n");
346 let start_length = start.len();
347 let total = bytes + start_length + 2;
348 buf.copy_within(..bytes, start_length);
349 buf[..start_length].copy_from_slice(start.as_bytes());
350 buf[total - 2..total].copy_from_slice(b"\r\n");
351 Poll::Ready(Ok(total))
352 }
353 }
354 }
355}
356
357struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
358impl Debug for SyncAsyncReader {
359 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
360 f.debug_struct("SyncAsyncReader").finish()
361 }
362}
363impl AsyncRead for SyncAsyncReader {
364 fn poll_read(
365 self: Pin<&mut Self>,
366 cx: &mut Context<'_>,
367 buf: &mut [u8],
368 ) -> Poll<Result<usize>> {
369 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
370 }
371}
372
373#[derive(Default)]
374pub(crate) enum BodyType {
375 #[default]
376 Empty,
377
378 Static {
379 content: Cow<'static, [u8]>,
380 cursor: usize,
381 },
382
383 Streaming {
384 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
385 progress: u64,
386 len: Option<u64>,
387 done: bool,
388 },
389}
390
391impl Debug for BodyType {
392 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
393 match self {
394 Empty => f.debug_tuple("BodyType::Empty").finish(),
395 Static { content, cursor } => f
396 .debug_struct("BodyType::Static")
397 .field("content", &String::from_utf8_lossy(content))
398 .field("cursor", cursor)
399 .finish(),
400 Streaming {
401 len,
402 done,
403 progress,
404 ..
405 } => f
406 .debug_struct("BodyType::Streaming")
407 .field("async_read", &format_args!(".."))
408 .field("len", &len)
409 .field("done", &done)
410 .field("progress", &progress)
411 .finish(),
412 }
413 }
414}
415
416impl BodyType {
417 fn is_empty(&self) -> bool {
418 match *self {
419 Empty => true,
420 Static { ref content, .. } => content.is_empty(),
421 Streaming { len, .. } => len == Some(0),
422 }
423 }
424
425 fn len(&self) -> Option<u64> {
426 match *self {
427 Empty => Some(0),
428 Static { ref content, .. } => Some(content.len() as u64),
429 Streaming { len, .. } => len,
430 }
431 }
432
433 fn bytes_read(&self) -> u64 {
434 match *self {
435 Empty => 0,
436 Static { cursor, .. } => cursor as u64,
437 Streaming { progress, .. } => progress,
438 }
439 }
440}
441
442impl From<String> for Body {
443 fn from(s: String) -> Self {
444 s.into_bytes().into()
445 }
446}
447
448impl From<&'static str> for Body {
449 fn from(s: &'static str) -> Self {
450 s.as_bytes().into()
451 }
452}
453
454impl From<&'static [u8]> for Body {
455 fn from(content: &'static [u8]) -> Self {
456 Self::new_static(content)
457 }
458}
459
460impl From<Vec<u8>> for Body {
461 fn from(content: Vec<u8>) -> Self {
462 Self::new_static(content)
463 }
464}
465
466impl From<Cow<'static, [u8]>> for Body {
467 fn from(value: Cow<'static, [u8]>) -> Self {
468 Self::new_static(value)
469 }
470}
471
472impl From<Cow<'static, str>> for Body {
473 fn from(value: Cow<'static, str>) -> Self {
474 match value {
475 Cow::Borrowed(b) => b.into(),
476 Cow::Owned(o) => o.into(),
477 }
478 }
479}
480
481#[cfg(test)]
482mod test_bytes_to_read {
483 #[test]
484 fn simple_check_of_known_values() {
485 // the marked rows are the most important part of this test,
486 // and a nonobvious but intentional consequence of the
487 // implementation. in order to avoid overflowing, we must use
488 // one fewer than the available buffer bytes because
489 // increasing the read size increase the number of framed
490 // bytes by two. This occurs when the hex representation of
491 // the content bytes is near an increase in order of magnitude
492 // (F->10, FF->100, FFF-> 1000, etc)
493 let values = vec![
494 (6, 1), // 1
495 (7, 2), // 2
496 (20, 15), // F
497 (21, 15), // F <-
498 (22, 16), // 10
499 (23, 17), // 11
500 (260, 254), // FE
501 (261, 254), // FE <-
502 (262, 255), // FF <-
503 (263, 256), // 100
504 (4100, 4093), // FFD
505 (4101, 4093), // FFD <-
506 (4102, 4094), // FFE <-
507 (4103, 4095), // FFF <-
508 (4104, 4096), // 1000
509 ];
510
511 for (input, expected) in values {
512 let actual = super::max_bytes_to_read(input);
513 assert_eq!(
514 actual, expected,
515 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
516 );
517
518 // testing the test:
519 let used_bytes = expected + 4 + format!("{expected:X}").len();
520 assert!(
521 used_bytes == input || used_bytes == input - 1,
522 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
523 input,
524 input,
525 input - 1,
526 used_bytes
527 );
528 }
529 }
530}