trillium_http/
body.rs

1use futures_lite::{io::Cursor, ready, AsyncRead, AsyncReadExt};
2use std::{
3    borrow::Cow,
4    fmt::Debug,
5    io::{Error, ErrorKind, Result},
6    pin::Pin,
7    task::{Context, Poll},
8};
9use BodyType::{Empty, Static, Streaming};
10
11/// The trillium representation of a http body. This can contain
12/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
13/// `AsyncRead` type.
14#[derive(Debug, Default)]
15pub struct Body(BodyType);
16
17impl Body {
18    /// Construct a new body from a streaming [`AsyncRead`] source. If
19    /// you have the body content in memory already, prefer
20    /// [`Body::new_static`] or one of the From conversions.
21    pub fn new_streaming(
22        async_read: impl AsyncRead + Send + Sync + 'static,
23        len: Option<u64>,
24    ) -> Self {
25        Self(Streaming {
26            async_read: Box::pin(async_read),
27            len,
28            done: false,
29            progress: 0,
30        })
31    }
32
33    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
34    /// [u8]`.
35    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
36        Self(Static {
37            content: content.into(),
38            cursor: 0,
39        })
40    }
41
42    /// Retrieve a borrow of the static content in this body. If this
43    /// body is a streaming body or an empty body, this will return
44    /// None.
45    pub fn static_bytes(&self) -> Option<&[u8]> {
46        match &self.0 {
47            Static { content, .. } => Some(content.as_ref()),
48            _ => None,
49        }
50    }
51
52    /// Transform this Body into a dyn `AsyncRead`. This will wrap
53    /// static content in a [`Cursor`]. Note that this is different
54    /// from reading directly from the Body, which includes chunked
55    /// encoding.
56    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync>> {
57        match self.0 {
58            Streaming { async_read, .. } => async_read,
59            Static { content, .. } => Box::pin(Cursor::new(content)),
60            Empty => Box::pin(Cursor::new("")),
61        }
62    }
63
64    /**
65    Consume this body and return the full content. If the body was
66    constructed with `[Body::new_streaming`], this will read the
67    entire streaming body into memory, awaiting the streaming
68    source's completion. This function will return an error if a
69    streaming body has already been read to completion.
70
71    # Errors
72
73    This returns an error variant if either of the following conditions are met:
74
75    * there is an io error when reading from the underlying transport such as a disconnect
76    * the body has already been read to completion
77
78    */
79    #[allow(clippy::missing_errors_doc)] // false positive
80    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
81        match self.0 {
82            Static { content, .. } => Ok(content),
83
84            Streaming {
85                mut async_read,
86                len,
87                progress: 0,
88                done: false,
89            } => {
90                let mut buf = len
91                    .and_then(|c| c.try_into().ok())
92                    .map(Vec::with_capacity)
93                    .unwrap_or_default();
94
95                async_read.read_to_end(&mut buf).await?;
96
97                Ok(Cow::Owned(buf))
98            }
99
100            Empty => Ok(Cow::Borrowed(b"")),
101
102            Streaming { .. } => Err(Error::new(
103                ErrorKind::Other,
104                "body already read to completion",
105            )),
106        }
107    }
108
109    /// Retrieve the number of bytes that have been read from this
110    /// body
111    pub fn bytes_read(&self) -> u64 {
112        self.0.bytes_read()
113    }
114
115    /// returns the content length of this body, if known and
116    /// available.
117    pub fn len(&self) -> Option<u64> {
118        self.0.len()
119    }
120
121    /// determine if the this body represents no data
122    pub fn is_empty(&self) -> bool {
123        self.0.is_empty()
124    }
125
126    /// determine if the this body represents static content
127    pub fn is_static(&self) -> bool {
128        matches!(self.0, Static { .. })
129    }
130
131    /// determine if the this body represents streaming content
132    pub fn is_streaming(&self) -> bool {
133        matches!(self.0, Streaming { .. })
134    }
135}
136
137#[allow(
138    clippy::cast_sign_loss,
139    clippy::cast_possible_truncation,
140    clippy::cast_precision_loss
141)]
142fn max_bytes_to_read(buf_len: usize) -> usize {
143    assert!(
144        buf_len >= 6,
145        "buffers of length {buf_len} are too small for this implementation.
146            if this is a problem for you, please open an issue"
147    );
148
149    // #[allow(clippy::cast_precision_loss)] applied to the function
150    // is for this line. We do not expect our buffers to be on the
151    // order of petabytes, so we will not fall outside of the range of
152    // integers that can be represented by f64
153    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
154
155    // #[allow(clippy::cast_sign_loss)] applied to the function is for
156    // this line. This is ok because we know buf_len is already a
157    // usize and we are just converting it to an f64 in order to do
158    // float log2(x)/4
159    //
160    // the maximum number of bytes that the hex representation of remaining bytes might take
161    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
162
163    // #[allow(clippy::cast_sign_loss)] applied to the function is for
164    // this line.  This is ok because max_bytes_of_hex_framing will
165    // always be smaller than bytes_remaining_after_two_cr_lns, and so
166    // there is no risk of sign loss
167    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
168}
169
170impl AsyncRead for Body {
171    fn poll_read(
172        mut self: Pin<&mut Self>,
173        cx: &mut Context<'_>,
174        buf: &mut [u8],
175    ) -> Poll<Result<usize>> {
176        match &mut self.0 {
177            Empty => Poll::Ready(Ok(0)),
178            Static { content, cursor } => {
179                let length = content.len();
180                if length == *cursor {
181                    return Poll::Ready(Ok(0));
182                }
183                let bytes = (length - *cursor).min(buf.len());
184                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
185                *cursor += bytes;
186                Poll::Ready(Ok(bytes))
187            }
188
189            Streaming {
190                async_read,
191                len: Some(len),
192                done,
193                progress,
194            } => {
195                if *done {
196                    return Poll::Ready(Ok(0));
197                }
198
199                let max_bytes_to_read = (*len - *progress)
200                    .try_into()
201                    .unwrap_or(buf.len())
202                    .min(buf.len());
203
204                let bytes = ready!(async_read
205                    .as_mut()
206                    .poll_read(cx, &mut buf[..max_bytes_to_read]))?;
207
208                if bytes == 0 {
209                    *done = true;
210                } else {
211                    *progress += bytes as u64;
212                }
213
214                Poll::Ready(Ok(bytes))
215            }
216
217            Streaming {
218                async_read,
219                len: None,
220                done,
221                progress,
222            } => {
223                if *done {
224                    return Poll::Ready(Ok(0));
225                }
226
227                let max_bytes_to_read = max_bytes_to_read(buf.len());
228
229                let bytes = ready!(async_read
230                    .as_mut()
231                    .poll_read(cx, &mut buf[..max_bytes_to_read]))?;
232
233                if bytes == 0 {
234                    *done = true;
235                } else {
236                    *progress += bytes as u64;
237                }
238
239                let start = format!("{bytes:X}\r\n");
240                let start_length = start.as_bytes().len();
241                let total = bytes + start_length + 2;
242                buf.copy_within(..bytes, start_length);
243                buf[..start_length].copy_from_slice(start.as_bytes());
244                buf[total - 2..total].copy_from_slice(b"\r\n");
245                Poll::Ready(Ok(total))
246            }
247        }
248    }
249}
250
251#[derive(Default)]
252enum BodyType {
253    #[default]
254    Empty,
255
256    Static {
257        content: Cow<'static, [u8]>,
258        cursor: usize,
259    },
260
261    Streaming {
262        async_read: Pin<Box<dyn AsyncRead + Send + Sync + 'static>>,
263        progress: u64,
264        len: Option<u64>,
265        done: bool,
266    },
267}
268
269impl Debug for BodyType {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        match self {
272            Empty => f.debug_tuple("BodyType::Empty").finish(),
273            Static { content, cursor } => f
274                .debug_struct("BodyType::Static")
275                .field("content", &String::from_utf8_lossy(content))
276                .field("cursor", cursor)
277                .finish(),
278            Streaming {
279                len,
280                done,
281                progress,
282                ..
283            } => f
284                .debug_struct("BodyType::Streaming")
285                .field("async_read", &"..")
286                .field("len", &len)
287                .field("done", &done)
288                .field("progress", &progress)
289                .finish(),
290        }
291    }
292}
293
294impl BodyType {
295    fn is_empty(&self) -> bool {
296        match *self {
297            Empty => true,
298            Static { ref content, .. } => content.is_empty(),
299            Streaming { len, .. } => len == Some(0),
300        }
301    }
302
303    fn len(&self) -> Option<u64> {
304        match *self {
305            Empty => Some(0),
306            Static { ref content, .. } => Some(content.len() as u64),
307            Streaming { len, .. } => len,
308        }
309    }
310
311    fn bytes_read(&self) -> u64 {
312        match *self {
313            Empty => 0,
314            Static { cursor, .. } => cursor as u64,
315            Streaming { progress, .. } => progress,
316        }
317    }
318}
319
320impl From<String> for Body {
321    fn from(s: String) -> Self {
322        s.into_bytes().into()
323    }
324}
325
326impl From<&'static str> for Body {
327    fn from(s: &'static str) -> Self {
328        s.as_bytes().into()
329    }
330}
331
332impl From<&'static [u8]> for Body {
333    fn from(content: &'static [u8]) -> Self {
334        Self::new_static(content)
335    }
336}
337
338impl From<Vec<u8>> for Body {
339    fn from(content: Vec<u8>) -> Self {
340        Self::new_static(content)
341    }
342}
343
344impl From<Cow<'static, [u8]>> for Body {
345    fn from(value: Cow<'static, [u8]>) -> Self {
346        Self::new_static(value)
347    }
348}
349
350impl From<Cow<'static, str>> for Body {
351    fn from(value: Cow<'static, str>) -> Self {
352        match value {
353            Cow::Borrowed(b) => b.into(),
354            Cow::Owned(o) => o.into(),
355        }
356    }
357}
358
359#[cfg(test)]
360mod test_bytes_to_read {
361    #[test]
362    fn simple_check_of_known_values() {
363        // the marked rows are the most important part of this test,
364        // and a nonobvious but intentional consequence of the
365        // implementation. in order to avoid overflowing, we must use
366        // one fewer than the available buffer bytes because
367        // increasing the read size increase the number of framed
368        // bytes by two. This occurs when the hex representation of
369        // the content bytes is near an increase in order of magnitude
370        // (F->10, FF->100, FFF-> 1000, etc)
371        let values = vec![
372            (6, 1),       // 1
373            (7, 2),       // 2
374            (20, 15),     // F
375            (21, 15),     // F <-
376            (22, 16),     // 10
377            (23, 17),     // 11
378            (260, 254),   // FE
379            (261, 254),   // FE <-
380            (262, 255),   // FF <-
381            (263, 256),   // 100
382            (4100, 4093), // FFD
383            (4101, 4093), // FFD <-
384            (4102, 4094), // FFE <-
385            (4103, 4095), // FFF <-
386            (4104, 4096), // 1000
387        ];
388
389        for (input, expected) in values {
390            let actual = super::max_bytes_to_read(input);
391            assert_eq!(
392                actual, expected,
393                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
394            );
395
396            // testing the test:
397            let used_bytes = expected + 4 + format!("{expected:X}").len();
398            assert!(
399                used_bytes == input || used_bytes == input - 1,
400                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
401                input,
402                input,
403                input - 1,
404                used_bytes
405            );
406        }
407    }
408}