Skip to main content

fastly/http/
body.rs

1//! HTTP bodies.
2
3pub(crate) mod handle;
4pub(crate) mod streaming;
5
6use self::handle::BodyHandle;
7use crate::{
8    convert::{Borrowable, ToHeaderName, ToHeaderValue},
9    experimental::BodyExt,
10    Error,
11};
12use http::header::HeaderMap;
13use std::fmt::Debug;
14use std::io::{BufRead, BufReader, BufWriter, Read, Write};
15use std::mem::ManuallyDrop;
16
17pub use streaming::StreamingBody;
18
19use super::{INITIAL_HEADER_NAME_BUF_SIZE, INITIAL_HEADER_VALUE_BUF_SIZE};
20
21/// An HTTP body that can be read from, written to, or appended to another body.
22///
23/// The most efficient ways to read from and write to the body are through the [`Read`],
24/// [`BufRead`], and [`Write`] implementations.
25///
26/// Read and write operations to a [`Body`] are automatically buffered, though you can take direct
27/// control over aspects of the buffering using the [`BufRead`] methods and [`Write::flush()`].
28pub struct Body {
29    // NOTE: The order of these fields with these different handles is load
30    // bearing. `BufWriter` needs `BodyHandle` so that it flushes out the buffer
31    // and then drops the `BodyHandle` out properly when `Body` is dropped.
32    // `BodyHandleWrapper` makes sure we don't double free the memory that
33    // `BodyHandle` points too.
34    reader: BufReader<BodyHandleWrapper>,
35    writer: BufWriter<BodyHandle>,
36}
37
38impl Debug for Body {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "<opaque Body>")
41    }
42}
43
44impl Body {
45    /// Get a new, empty HTTP body.
46    #[allow(clippy::new_without_default)]
47    pub fn new() -> Self {
48        BodyHandle::new().into()
49    }
50
51    pub(super) fn get_handle(&self) -> &BodyHandle {
52        self.writer.get_ref()
53    }
54
55    // this is not exported, since misuse can lead to data getting dropped or appearing out of order
56    fn handle(&mut self) -> &mut BodyHandle {
57        self.writer.get_mut()
58    }
59
60    /// Convert a [`Body`] into the low-level [`BodyHandle`] interface.
61    pub fn into_handle(mut self) -> BodyHandle {
62        self.put_back_read_buf();
63        // Flushes the buffer and returns the underlying `BodyHandle`
64        self.writer
65            .into_inner()
66            .expect("fastly_http_body::write failed")
67    }
68
69    /// Put any currently buffered read data back at the front of the body.
70    fn put_back_read_buf(&mut self) {
71        let read_buf = self.reader.buffer();
72        if !read_buf.is_empty() {
73            // We have to cheat a little here to get mutable access to the handle while the reader
74            // buffer is borrowed. Since we're not going to read or write through the `self`
75            // interface while `body_handle` is live, no other aliases of the handle will be used.
76            let mut body_handle =
77                ManuallyDrop::new(unsafe { BodyHandle::from_u32(self.writer.get_ref().as_u32()) });
78            let nwritten = body_handle.write_front(read_buf);
79            // Let the `BufReader` know that we've consumed these bytes from its internal buffer so
80            // it won't yield them for a subsequent read.
81            self.reader.consume(nwritten)
82        };
83    }
84
85    /// Read the entirety of the body into a byte vector.
86    ///
87    #[doc = include_str!("../../docs/snippets/buffers-body.md")]
88    pub fn into_bytes(self) -> Vec<u8> {
89        self.into_handle().into_bytes()
90    }
91
92    /// Read the entirety of the body into a `String`, interpreting the bytes as UTF-8.
93    ///
94    #[doc = include_str!("../../docs/snippets/buffers-body.md")]
95    ///
96    /// # Panics
97    ///
98    #[doc = include_str!("../../docs/snippets/panics-body-utf8.md")]
99    pub fn into_string(self) -> String {
100        self.into_handle().into_string()
101    }
102
103    /// Append another body onto the end of this body.
104    ///
105    #[doc = include_str!("../../docs/snippets/body-append-constant-time.md")]
106    pub fn append(&mut self, other: Body) {
107        // flush the write buffer of the destination body, so that we can use the append method on
108        // the underlying handles. Unwrap, as `BodyHandle::flush` won't return actionable errors.
109        self.writer.flush().expect("fastly_http_body::write failed");
110        self.handle().append(other.into_handle())
111    }
112
113    /// Return an iterator that reads the body in chunks of at most the given number of bytes.
114    ///
115    /// If `chunk_size` does not evenly divide the length of the body, then the last chunk will not
116    /// have length `chunk_size`.
117    ///
118    /// # Examples
119    ///
120    /// ```no_run
121    /// # use fastly::Body;
122    /// use std::io::Write;
123    /// fn remove_0s(body: &mut Body) -> Result<(), Box<dyn std::error::Error>> {
124    ///     let mut no_0s = Body::new();
125    ///     for chunk in body.read_chunks(4096) {
126    ///         let mut chunk = chunk?;
127    ///         chunk.retain(|b| *b != 0);
128    ///         no_0s.write_all(&chunk)?;
129    ///     }
130    ///     *body = no_0s;
131    ///     Ok(())
132    /// }
133    /// ```
134    pub fn read_chunks(
135        &mut self,
136        chunk_size: usize,
137    ) -> impl Iterator<Item = Result<Vec<u8>, std::io::Error>> + '_ {
138        std::iter::from_fn(move || {
139            let mut chunk = vec![0; chunk_size];
140            match self.read(&mut chunk) {
141                Ok(0) => None,
142                Ok(nread) => {
143                    chunk.truncate(nread);
144                    Some(Ok(chunk))
145                }
146                Err(e) => Some(Err(e)),
147            }
148        })
149    }
150
151    /// Get a prefix of the body containing up to the given number of bytes.
152    ///
153    /// This is particularly useful when you only need to inspect the first few bytes of a body, or
154    /// want to read an entire body up to a certain size limit to control memory consumption.
155    ///
156    /// Note that the length of the returned prefix may be shorter than the requested length if the
157    /// length of the entire body is shorter.
158    ///
159    /// The returned [`Prefix`] value is a smart pointer wrapping a `&mut Vec<u8>`. You can use it
160    /// as you would a [`&mut Vec<u8>`][`Vec`] or a [`&mut [u8]`][`std::slice`] to view or modify
161    /// the contents of the prefix.
162    ///
163    /// When the [`Prefix`] is dropped, the prefix bytes are returned to the body, including any
164    /// modifications that have been made. Because the prefix holds a mutable reference to the body,
165    /// you may need to explicitly [`drop()`] the prefix to perform other operations on the
166    /// body.
167    ///
168    /// If you do not need to return the prefix bytes to the body, use [`Prefix::take()`] to consume
169    /// the prefix as an owned byte vector without writing it back.
170    ///
171    /// # Examples
172    ///
173    /// Checking whether the body starts with the [WebAssembly magic
174    /// number](https://webassembly.github.io/spec/core/binary/modules.html#binary-module):
175    ///
176    /// ```no_run
177    /// const MAGIC: &[u8] = b"\0asm";
178    /// # let mut body = fastly::Body::from(MAGIC);
179    /// let prefix = body.get_prefix_mut(MAGIC.len());
180    /// if prefix.as_slice() == MAGIC {
181    ///     println!("might be Wasm!");
182    /// }
183    /// ```
184    ///
185    /// Zero out the timestamp bytes in a [gzip header](https://en.wikipedia.org/wiki/Gzip#File_format):
186    ///
187    /// ```no_run
188    /// # let mut body = fastly::Body::from(&[0x1f, 0x8b, 0x01, 0x00, 0xba, 0xc8, 0x4d, 0x20][..]);
189    /// let mut prefix = body.get_prefix_mut(8);
190    /// for i in 4..8 {
191    ///     prefix[i] = 0;
192    /// }
193    /// ```
194    ///
195    /// Try to consume the body as a [JSON value][`serde_json::Value`], but only up to the first
196    /// 4KiB. Note the use of `take()` to avoid writing the bytes back to the body unnecessarily:
197    ///
198    /// ```no_run
199    /// # use serde_json::{json, to_writer};
200    /// # let mut body = fastly::Body::new();
201    /// # to_writer(&mut body, &json!({"hello": "world!" })).unwrap();
202    /// let prefix = body.get_prefix_mut(4096).take();
203    /// let json: serde_json::Value = serde_json::from_slice(&prefix).unwrap();
204    /// ```
205    pub fn get_prefix_mut(&mut self, length: usize) -> Prefix<'_> {
206        self.try_get_prefix_mut(length).expect("body read failed")
207    }
208
209    /// Try to get a prefix of the body up to the given number of bytes.
210    ///
211    /// Unlike [`get_prefix_mut()`][`Self::get_prefix_mut()`], this method does not panic if an I/O
212    /// error occurs.
213    pub fn try_get_prefix_mut(&mut self, length: usize) -> std::io::Result<Prefix<'_>> {
214        let mut buf = vec![];
215        let nread = self
216            .take(length.try_into().unwrap())
217            .read_to_end(&mut buf)?;
218        buf.truncate(nread);
219        Ok(Prefix::new(buf, self))
220    }
221
222    /// Get a prefix of the body as a string containing up to the given number of bytes.
223    ///
224    /// This is particularly useful when you only need to inspect the first few characters of a body or
225    /// want to read an entire body up to a certain size limit to control memory consumption.
226    ///
227    /// Note that the length of the returned prefix may be shorter than the requested length if the
228    /// length of the entire body is shorter or if the requested length fell in the middle of a
229    /// multi-byte UTF-8 codepoint.
230    ///
231    /// The returned [`PrefixString`] value is a smart pointer wrapping a `&mut String`. You can use
232    /// it as you would a [`&mut String`][`String`] or a [`&mut str`][`std::str`] to view or modify
233    /// the contents of the prefix.
234    ///
235    /// When the [`PrefixString`] is dropped, the prefix characters are returned to the body,
236    /// including any modifications that have been made. Because the prefix holds a mutable
237    /// reference to the body, you may need to explicitly [`drop()`] the prefix before performing
238    /// other operations on the body.
239    ///
240    /// If you do not need to return the prefix characters to the body, use [`PrefixString::take()`] to
241    /// consume the prefix as an owned string without writing it back.
242    ///
243    /// # Panics
244    ///
245    /// If the prefix contains invalid UTF-8 bytes, this function will panic. The exception to this
246    /// is if the bytes are invalid because a multi-byte codepoint is cut off by the requested
247    /// prefix length. In this case, the invalid bytes are left off the end of the prefix.
248    ///
249    /// To explicitly handle the possibility of invalid UTF-8 bytes, use
250    /// [`try_get_prefix_str_mut()`][`Self::try_get_prefix_str_mut()`], which returns an error on
251    /// failure rather than panicking.
252    ///
253    /// # Examples
254    ///
255    /// Check whether the body starts with the [M3U8 file header][m3u8]:
256    ///
257    /// ```no_run
258    /// const HEADER: &str = "#EXTM3U";
259    /// # let mut body = fastly::Body::from(HEADER);
260    /// let prefix = body.get_prefix_str_mut(7);
261    /// if prefix.as_str() == HEADER {
262    ///     println!("might be an M3U8 file!");
263    /// }
264    /// ```
265    ///
266    /// Insert a new playlist entry before the first occurrence of `#EXTINF` in an [M3U8
267    /// file][m3u8]:
268    ///
269    /// ```no_run
270    /// # let mut body = fastly::Body::from("#EXTM3U\n#EXT-X-TARGETDURATION:10\n#EXT-X-VERSION:4\n#EXT-X-MEDIA-SEQUENCE:1\n#EXTINF:10.0,\nfileSequence1.ts\n");
271    /// let mut prefix = body.get_prefix_str_mut(1024);
272    /// let first_entry = prefix.find("#EXTINF").unwrap();
273    /// prefix.insert_str(first_entry, "#EXTINF:10.0,\nnew_first_file.ts\n");
274    /// ```
275    ///
276    /// Try to consume the body as a [JSON value][`serde_json::Value`], but only up to the first
277    /// 4KiB. Note the use of `take()` to avoid writing the characters back to the body unnecessarily:
278    ///
279    /// ```no_run
280    /// # use serde_json::{json, to_writer};
281    /// # let mut body = fastly::Body::new();
282    /// # to_writer(&mut body, &json!({"hello": "world!" })).unwrap();
283    /// let prefix = body.get_prefix_str_mut(4096).take();
284    /// let json: serde_json::Value = serde_json::from_str(&prefix).unwrap();
285    /// ```
286    ///
287    /// [m3u8]: https://en.wikipedia.org/wiki/M3U#Extended_M3U
288    pub fn get_prefix_str_mut(&mut self, length: usize) -> PrefixString<'_> {
289        self.try_get_prefix_str_mut(length)
290            .expect("UTF-8 error in body prefix")
291    }
292
293    /// Try to get a prefix of the body as a string containing up to the given number of bytes.
294    ///
295    /// Unlike [`get_prefix_str_mut()`][`Self::get_prefix_str_mut()`], this function does not panic
296    /// when the prefix contains invalid UTF-8 bytes.
297    pub fn try_get_prefix_str_mut(
298        &mut self,
299        length: usize,
300    ) -> Result<PrefixString<'_>, std::str::Utf8Error> {
301        let mut buf = vec![];
302        let nread = self
303            .take(length.try_into().unwrap())
304            .read_to_end(&mut buf)
305            .expect("body read failed");
306        buf.truncate(nread);
307        match String::from_utf8(buf) {
308            Ok(string) => Ok(PrefixString::new(string, self)),
309            Err(e) => {
310                // Determine whether the error is due to the cutoff at the end or due to bad UTF-8
311                // bytes. In either case, there may be bytes we want to put back onto the body.
312                let err = e.utf8_error();
313                let mut bytes = e.into_bytes();
314                let (excess_bytes, result) = match err.error_len() {
315                    None => {
316                        // The error was due to a codepoint cut off at the end, so convert the valid
317                        // part of the prefix and put the partial codepoint bytes back.
318                        let end_bytes = bytes.split_off(err.valid_up_to());
319                        let string = String::from_utf8(bytes)
320                            .expect("expected only valid UTF-8 after splitting off bad codepoint");
321                        (end_bytes, Ok(string))
322                    }
323                    Some(_) => {
324                        // There were bad UTF-8 bytes within the prefix, so return a UTF-8 error and
325                        // put all the bytes back.
326                        (bytes, Err(err))
327                    }
328                };
329                // Put invalid bytes back onto the body, if there are any.
330                if !excess_bytes.is_empty() {
331                    self.put_back_read_buf();
332                    self.writer.get_mut().write_front(&excess_bytes);
333                }
334                result.map(move |string| PrefixString::new(string, self))
335            }
336        }
337    }
338
339    /// The body's length in bytes, if it is known.
340    ///
341    /// If the length is not known, it is likely due to the body arising from an HTTP/1.1 message
342    /// with chunked encoding, an HTTP/2 or later message with no `content-length`, or other
343    /// response-streaming technique.
344    ///
345    /// Note that receiving a length from this function does not guarantee that the full number of
346    /// bytes can actually be read from the body. For example, when proxying a response from a
347    /// backend, this length may reflect the `content-length` promised in the response, but if the
348    /// backend connection is closed prematurely, fewer bytes may be delivered before the body
349    /// can no longer be read.
350    pub fn known_length(&mut self) -> Option<u64> {
351        self.handle().known_length()
352    }
353}
354
355// For these trait implementations we only implement the methods that the underlying buffered
356// adaptors implement; the default implementations for the others will behave the same.
357//
358// The main bit of caution we must use here is that any read should be preceded by flushing the
359// write buffer. `BufWriter` doesn't make any calls if its buffer is empty, so this isn't very
360// expensive and could prevent unexpected results if a program is trying to read and write from the
361// same body.
362impl Read for Body {
363    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
364        self.writer.flush()?;
365        self.reader.read(buf)
366    }
367
368    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
369        self.writer.flush()?;
370        self.reader.read_vectored(bufs)
371    }
372}
373
374impl BufRead for Body {
375    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
376        self.writer.flush()?;
377        self.reader.fill_buf()
378    }
379
380    fn consume(&mut self, amt: usize) {
381        self.reader.consume(amt)
382    }
383}
384
385impl Write for Body {
386    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
387        self.writer.write(buf)
388    }
389
390    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
391        self.writer.write_vectored(bufs)
392    }
393
394    fn flush(&mut self) -> std::io::Result<()> {
395        self.writer.flush()
396    }
397}
398
399impl From<BodyHandle> for Body {
400    fn from(handle: BodyHandle) -> Self {
401        // we clone the handle here in order to have an owned type for the reader and writer, but
402        // this means we have to be careful that we don't make the aliasing observable from the
403        // public interface
404        let handle2 = unsafe { BodyHandle::from_u32(handle.as_u32()) };
405        Self {
406            reader: BufReader::new(BodyHandleWrapper::new(handle)),
407            writer: BufWriter::new(handle2),
408        }
409    }
410}
411
412impl From<&str> for Body {
413    fn from(s: &str) -> Self {
414        BodyHandle::from(s).into()
415    }
416}
417
418impl From<String> for Body {
419    fn from(s: String) -> Self {
420        BodyHandle::from(s).into()
421    }
422}
423
424impl From<&[u8]> for Body {
425    fn from(s: &[u8]) -> Self {
426        BodyHandle::from(s).into()
427    }
428}
429
430impl From<Vec<u8>> for Body {
431    fn from(s: Vec<u8>) -> Self {
432        BodyHandle::from(s).into()
433    }
434}
435
436/// Smart pointer returned by [`Body::get_prefix_mut()`].
437pub struct Prefix<'a> {
438    /// The mutable prefix buffer, if it hasn't yet been taken.
439    ///
440    /// `Prefix` is always created with `Some(buf)`, and [`Prefix::take()`] is the only method that
441    /// changes `self.buf` to `None`. That means the `unwrap`s in the other methods are safe.
442    buf: Option<Vec<u8>>,
443    body: &'a mut Body,
444}
445
446impl std::fmt::Debug for Prefix<'_> {
447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448        f.debug_struct("Prefix")
449            .field("buf", self.buf.as_ref().unwrap())
450            .finish()
451    }
452}
453
454impl<'a> Prefix<'a> {
455    fn new(buf: Vec<u8>, body: &'a mut Body) -> Self {
456        Self {
457            buf: Some(buf),
458            body,
459        }
460    }
461
462    /// Return the prefix as a byte vector without writing it back to the `Body` from which it came.
463    pub fn take(mut self) -> Vec<u8> {
464        self.buf.take().unwrap()
465    }
466}
467
468impl std::ops::Deref for Prefix<'_> {
469    type Target = Vec<u8>;
470
471    fn deref(&self) -> &Self::Target {
472        self.buf.as_ref().unwrap()
473    }
474}
475
476impl std::ops::DerefMut for Prefix<'_> {
477    fn deref_mut(&mut self) -> &mut Self::Target {
478        self.buf.as_mut().unwrap()
479    }
480}
481
482impl Drop for Prefix<'_> {
483    fn drop(&mut self) {
484        // Only put bytes back if `Prefix::take()` has not been called.
485        if let Some(buf) = &self.buf {
486            self.body.put_back_read_buf();
487            self.body.writer.get_mut().write_front(buf);
488        }
489    }
490}
491
492/// Smart pointer returned by [`Body::get_prefix_str_mut()`].
493pub struct PrefixString<'a> {
494    /// The mutable prefix buffer, if it hasn't yet been taken.
495    ///
496    /// `PrefixString` is always created with `Some(buf)`, and [`PrefixString::take()`] is the only
497    /// method that changes `self.buf` to `None`. That means the `unwrap`s in the other methods are
498    /// safe.
499    buf: Option<String>,
500    body: &'a mut Body,
501}
502
503impl std::fmt::Debug for PrefixString<'_> {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct("PrefixString")
506            .field("buf", self.buf.as_ref().unwrap())
507            .finish()
508    }
509}
510
511impl<'a> PrefixString<'a> {
512    fn new(buf: String, body: &'a mut Body) -> Self {
513        Self {
514            buf: Some(buf),
515            body,
516        }
517    }
518
519    /// Return the prefix as a string without writing it back to the `Body` from which it came.
520    pub fn take(mut self) -> String {
521        self.buf.take().unwrap()
522    }
523}
524
525impl std::ops::Deref for PrefixString<'_> {
526    type Target = String;
527
528    fn deref(&self) -> &Self::Target {
529        self.buf.as_ref().unwrap()
530    }
531}
532
533impl std::ops::DerefMut for PrefixString<'_> {
534    fn deref_mut(&mut self) -> &mut Self::Target {
535        self.buf.as_mut().unwrap()
536    }
537}
538
539impl Drop for PrefixString<'_> {
540    fn drop(&mut self) {
541        // Only put bytes back if `PrefixString::take()` has not been called.
542        if let Some(buf) = &self.buf {
543            self.body.put_back_read_buf();
544            self.body.writer.get_mut().write_front(buf.as_bytes());
545        }
546    }
547}
548
549/// An internal wrapper used in `Body` to prevent closing the handle twice by
550/// wrapping a `BodyHandle` in a `ManuallyDrop` to prevent the
551/// `BodyHandle` having it's destructor called. This type should not be used outside
552/// of this module. All the function calls used by `Body` on the writer handle
553/// are reimplemented for the wrapper and it just passes the function call to
554/// the inner handle
555#[repr(transparent)]
556struct BodyHandleWrapper {
557    handle: ManuallyDrop<BodyHandle>,
558}
559
560impl BodyHandleWrapper {
561    fn new(handle: BodyHandle) -> Self {
562        Self {
563            handle: ManuallyDrop::new(handle),
564        }
565    }
566}
567
568impl Write for BodyHandleWrapper {
569    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
570        self.handle.write(buf)
571    }
572
573    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
574        self.handle.write_vectored(bufs)
575    }
576
577    fn flush(&mut self) -> std::io::Result<()> {
578        self.handle.flush()
579    }
580}
581
582impl Read for BodyHandleWrapper {
583    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
584        self.handle.read(buf)
585    }
586}
587
588impl BodyExt for Body {
589    fn append_trailer(&mut self, name: impl ToHeaderName, value: impl ToHeaderValue) {
590        self.handle().append_trailer(
591            name.into_borrowable().as_ref(),
592            value.into_borrowable().as_ref(),
593        );
594    }
595
596    fn get_trailers(&mut self) -> Result<HeaderMap, Error> {
597        let mut trailers = HeaderMap::new();
598        let handle = self.handle();
599
600        for name in handle.get_trailer_names_impl(INITIAL_HEADER_NAME_BUF_SIZE, None)? {
601            let name = name?;
602            for value in
603                handle.get_trailer_values_impl(&name, INITIAL_HEADER_VALUE_BUF_SIZE, None)?
604            {
605                let value = value?;
606                trailers.append(&name, value);
607            }
608        }
609
610        Ok(trailers)
611    }
612}