fastly/http/body.rs
1//! HTTP bodies.
2
3pub(crate) mod handle;
4pub(crate) mod streaming;
5
6use self::handle::BodyHandle;
7use crate::{
8 convert::{Borrowable, ToHeaderName, ToHeaderValue},
9 experimental::BodyExt,
10 Error,
11};
12use http::header::HeaderMap;
13use std::fmt::Debug;
14use std::io::{BufRead, BufReader, BufWriter, Read, Write};
15use std::mem::ManuallyDrop;
16
17pub use streaming::StreamingBody;
18
19use super::{INITIAL_HEADER_NAME_BUF_SIZE, INITIAL_HEADER_VALUE_BUF_SIZE};
20
21/// An HTTP body that can be read from, written to, or appended to another body.
22///
23/// The most efficient ways to read from and write to the body are through the [`Read`],
24/// [`BufRead`], and [`Write`] implementations.
25///
26/// Read and write operations to a [`Body`] are automatically buffered, though you can take direct
27/// control over aspects of the buffering using the [`BufRead`] methods and [`Write::flush()`].
28pub struct Body {
29 // NOTE: The order of these fields with these different handles is load
30 // bearing. `BufWriter` needs `BodyHandle` so that it flushes out the buffer
31 // and then drops the `BodyHandle` out properly when `Body` is dropped.
32 // `BodyHandleWrapper` makes sure we don't double free the memory that
33 // `BodyHandle` points too.
34 reader: BufReader<BodyHandleWrapper>,
35 writer: BufWriter<BodyHandle>,
36}
37
38impl Debug for Body {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "<opaque Body>")
41 }
42}
43
44impl Body {
45 /// Get a new, empty HTTP body.
46 #[allow(clippy::new_without_default)]
47 pub fn new() -> Self {
48 BodyHandle::new().into()
49 }
50
51 pub(super) fn get_handle(&self) -> &BodyHandle {
52 self.writer.get_ref()
53 }
54
55 // this is not exported, since misuse can lead to data getting dropped or appearing out of order
56 fn handle(&mut self) -> &mut BodyHandle {
57 self.writer.get_mut()
58 }
59
60 /// Convert a [`Body`] into the low-level [`BodyHandle`] interface.
61 pub fn into_handle(mut self) -> BodyHandle {
62 self.put_back_read_buf();
63 // Flushes the buffer and returns the underlying `BodyHandle`
64 self.writer
65 .into_inner()
66 .expect("fastly_http_body::write failed")
67 }
68
69 /// Put any currently buffered read data back at the front of the body.
70 fn put_back_read_buf(&mut self) {
71 let read_buf = self.reader.buffer();
72 if !read_buf.is_empty() {
73 // We have to cheat a little here to get mutable access to the handle while the reader
74 // buffer is borrowed. Since we're not going to read or write through the `self`
75 // interface while `body_handle` is live, no other aliases of the handle will be used.
76 let mut body_handle =
77 ManuallyDrop::new(unsafe { BodyHandle::from_u32(self.writer.get_ref().as_u32()) });
78 let nwritten = body_handle.write_front(read_buf);
79 // Let the `BufReader` know that we've consumed these bytes from its internal buffer so
80 // it won't yield them for a subsequent read.
81 self.reader.consume(nwritten)
82 };
83 }
84
85 /// Read the entirety of the body into a byte vector.
86 ///
87 #[doc = include_str!("../../docs/snippets/buffers-body.md")]
88 pub fn into_bytes(self) -> Vec<u8> {
89 self.into_handle().into_bytes()
90 }
91
92 /// Read the entirety of the body into a `String`, interpreting the bytes as UTF-8.
93 ///
94 #[doc = include_str!("../../docs/snippets/buffers-body.md")]
95 ///
96 /// # Panics
97 ///
98 #[doc = include_str!("../../docs/snippets/panics-body-utf8.md")]
99 pub fn into_string(self) -> String {
100 self.into_handle().into_string()
101 }
102
103 /// Append another body onto the end of this body.
104 ///
105 #[doc = include_str!("../../docs/snippets/body-append-constant-time.md")]
106 pub fn append(&mut self, other: Body) {
107 // flush the write buffer of the destination body, so that we can use the append method on
108 // the underlying handles. Unwrap, as `BodyHandle::flush` won't return actionable errors.
109 self.writer.flush().expect("fastly_http_body::write failed");
110 self.handle().append(other.into_handle())
111 }
112
113 /// Return an iterator that reads the body in chunks of at most the given number of bytes.
114 ///
115 /// If `chunk_size` does not evenly divide the length of the body, then the last chunk will not
116 /// have length `chunk_size`.
117 ///
118 /// # Examples
119 ///
120 /// ```no_run
121 /// # use fastly::Body;
122 /// use std::io::Write;
123 /// fn remove_0s(body: &mut Body) -> Result<(), Box<dyn std::error::Error>> {
124 /// let mut no_0s = Body::new();
125 /// for chunk in body.read_chunks(4096) {
126 /// let mut chunk = chunk?;
127 /// chunk.retain(|b| *b != 0);
128 /// no_0s.write_all(&chunk)?;
129 /// }
130 /// *body = no_0s;
131 /// Ok(())
132 /// }
133 /// ```
134 pub fn read_chunks(
135 &mut self,
136 chunk_size: usize,
137 ) -> impl Iterator<Item = Result<Vec<u8>, std::io::Error>> + '_ {
138 std::iter::from_fn(move || {
139 let mut chunk = vec![0; chunk_size];
140 match self.read(&mut chunk) {
141 Ok(0) => None,
142 Ok(nread) => {
143 chunk.truncate(nread);
144 Some(Ok(chunk))
145 }
146 Err(e) => Some(Err(e)),
147 }
148 })
149 }
150
151 /// Get a prefix of the body containing up to the given number of bytes.
152 ///
153 /// This is particularly useful when you only need to inspect the first few bytes of a body, or
154 /// want to read an entire body up to a certain size limit to control memory consumption.
155 ///
156 /// Note that the length of the returned prefix may be shorter than the requested length if the
157 /// length of the entire body is shorter.
158 ///
159 /// The returned [`Prefix`] value is a smart pointer wrapping a `&mut Vec<u8>`. You can use it
160 /// as you would a [`&mut Vec<u8>`][`Vec`] or a [`&mut [u8]`][`std::slice`] to view or modify
161 /// the contents of the prefix.
162 ///
163 /// When the [`Prefix`] is dropped, the prefix bytes are returned to the body, including any
164 /// modifications that have been made. Because the prefix holds a mutable reference to the body,
165 /// you may need to explicitly [`drop()`] the prefix to perform other operations on the
166 /// body.
167 ///
168 /// If you do not need to return the prefix bytes to the body, use [`Prefix::take()`] to consume
169 /// the prefix as an owned byte vector without writing it back.
170 ///
171 /// # Examples
172 ///
173 /// Checking whether the body starts with the [WebAssembly magic
174 /// number](https://webassembly.github.io/spec/core/binary/modules.html#binary-module):
175 ///
176 /// ```no_run
177 /// const MAGIC: &[u8] = b"\0asm";
178 /// # let mut body = fastly::Body::from(MAGIC);
179 /// let prefix = body.get_prefix_mut(MAGIC.len());
180 /// if prefix.as_slice() == MAGIC {
181 /// println!("might be Wasm!");
182 /// }
183 /// ```
184 ///
185 /// Zero out the timestamp bytes in a [gzip header](https://en.wikipedia.org/wiki/Gzip#File_format):
186 ///
187 /// ```no_run
188 /// # let mut body = fastly::Body::from(&[0x1f, 0x8b, 0x01, 0x00, 0xba, 0xc8, 0x4d, 0x20][..]);
189 /// let mut prefix = body.get_prefix_mut(8);
190 /// for i in 4..8 {
191 /// prefix[i] = 0;
192 /// }
193 /// ```
194 ///
195 /// Try to consume the body as a [JSON value][`serde_json::Value`], but only up to the first
196 /// 4KiB. Note the use of `take()` to avoid writing the bytes back to the body unnecessarily:
197 ///
198 /// ```no_run
199 /// # use serde_json::{json, to_writer};
200 /// # let mut body = fastly::Body::new();
201 /// # to_writer(&mut body, &json!({"hello": "world!" })).unwrap();
202 /// let prefix = body.get_prefix_mut(4096).take();
203 /// let json: serde_json::Value = serde_json::from_slice(&prefix).unwrap();
204 /// ```
205 pub fn get_prefix_mut(&mut self, length: usize) -> Prefix<'_> {
206 self.try_get_prefix_mut(length).expect("body read failed")
207 }
208
209 /// Try to get a prefix of the body up to the given number of bytes.
210 ///
211 /// Unlike [`get_prefix_mut()`][`Self::get_prefix_mut()`], this method does not panic if an I/O
212 /// error occurs.
213 pub fn try_get_prefix_mut(&mut self, length: usize) -> std::io::Result<Prefix<'_>> {
214 let mut buf = vec![];
215 let nread = self
216 .take(length.try_into().unwrap())
217 .read_to_end(&mut buf)?;
218 buf.truncate(nread);
219 Ok(Prefix::new(buf, self))
220 }
221
222 /// Get a prefix of the body as a string containing up to the given number of bytes.
223 ///
224 /// This is particularly useful when you only need to inspect the first few characters of a body or
225 /// want to read an entire body up to a certain size limit to control memory consumption.
226 ///
227 /// Note that the length of the returned prefix may be shorter than the requested length if the
228 /// length of the entire body is shorter or if the requested length fell in the middle of a
229 /// multi-byte UTF-8 codepoint.
230 ///
231 /// The returned [`PrefixString`] value is a smart pointer wrapping a `&mut String`. You can use
232 /// it as you would a [`&mut String`][`String`] or a [`&mut str`][`std::str`] to view or modify
233 /// the contents of the prefix.
234 ///
235 /// When the [`PrefixString`] is dropped, the prefix characters are returned to the body,
236 /// including any modifications that have been made. Because the prefix holds a mutable
237 /// reference to the body, you may need to explicitly [`drop()`] the prefix before performing
238 /// other operations on the body.
239 ///
240 /// If you do not need to return the prefix characters to the body, use [`PrefixString::take()`] to
241 /// consume the prefix as an owned string without writing it back.
242 ///
243 /// # Panics
244 ///
245 /// If the prefix contains invalid UTF-8 bytes, this function will panic. The exception to this
246 /// is if the bytes are invalid because a multi-byte codepoint is cut off by the requested
247 /// prefix length. In this case, the invalid bytes are left off the end of the prefix.
248 ///
249 /// To explicitly handle the possibility of invalid UTF-8 bytes, use
250 /// [`try_get_prefix_str_mut()`][`Self::try_get_prefix_str_mut()`], which returns an error on
251 /// failure rather than panicking.
252 ///
253 /// # Examples
254 ///
255 /// Check whether the body starts with the [M3U8 file header][m3u8]:
256 ///
257 /// ```no_run
258 /// const HEADER: &str = "#EXTM3U";
259 /// # let mut body = fastly::Body::from(HEADER);
260 /// let prefix = body.get_prefix_str_mut(7);
261 /// if prefix.as_str() == HEADER {
262 /// println!("might be an M3U8 file!");
263 /// }
264 /// ```
265 ///
266 /// Insert a new playlist entry before the first occurrence of `#EXTINF` in an [M3U8
267 /// file][m3u8]:
268 ///
269 /// ```no_run
270 /// # let mut body = fastly::Body::from("#EXTM3U\n#EXT-X-TARGETDURATION:10\n#EXT-X-VERSION:4\n#EXT-X-MEDIA-SEQUENCE:1\n#EXTINF:10.0,\nfileSequence1.ts\n");
271 /// let mut prefix = body.get_prefix_str_mut(1024);
272 /// let first_entry = prefix.find("#EXTINF").unwrap();
273 /// prefix.insert_str(first_entry, "#EXTINF:10.0,\nnew_first_file.ts\n");
274 /// ```
275 ///
276 /// Try to consume the body as a [JSON value][`serde_json::Value`], but only up to the first
277 /// 4KiB. Note the use of `take()` to avoid writing the characters back to the body unnecessarily:
278 ///
279 /// ```no_run
280 /// # use serde_json::{json, to_writer};
281 /// # let mut body = fastly::Body::new();
282 /// # to_writer(&mut body, &json!({"hello": "world!" })).unwrap();
283 /// let prefix = body.get_prefix_str_mut(4096).take();
284 /// let json: serde_json::Value = serde_json::from_str(&prefix).unwrap();
285 /// ```
286 ///
287 /// [m3u8]: https://en.wikipedia.org/wiki/M3U#Extended_M3U
288 pub fn get_prefix_str_mut(&mut self, length: usize) -> PrefixString<'_> {
289 self.try_get_prefix_str_mut(length)
290 .expect("UTF-8 error in body prefix")
291 }
292
293 /// Try to get a prefix of the body as a string containing up to the given number of bytes.
294 ///
295 /// Unlike [`get_prefix_str_mut()`][`Self::get_prefix_str_mut()`], this function does not panic
296 /// when the prefix contains invalid UTF-8 bytes.
297 pub fn try_get_prefix_str_mut(
298 &mut self,
299 length: usize,
300 ) -> Result<PrefixString<'_>, std::str::Utf8Error> {
301 let mut buf = vec![];
302 let nread = self
303 .take(length.try_into().unwrap())
304 .read_to_end(&mut buf)
305 .expect("body read failed");
306 buf.truncate(nread);
307 match String::from_utf8(buf) {
308 Ok(string) => Ok(PrefixString::new(string, self)),
309 Err(e) => {
310 // Determine whether the error is due to the cutoff at the end or due to bad UTF-8
311 // bytes. In either case, there may be bytes we want to put back onto the body.
312 let err = e.utf8_error();
313 let mut bytes = e.into_bytes();
314 let (excess_bytes, result) = match err.error_len() {
315 None => {
316 // The error was due to a codepoint cut off at the end, so convert the valid
317 // part of the prefix and put the partial codepoint bytes back.
318 let end_bytes = bytes.split_off(err.valid_up_to());
319 let string = String::from_utf8(bytes)
320 .expect("expected only valid UTF-8 after splitting off bad codepoint");
321 (end_bytes, Ok(string))
322 }
323 Some(_) => {
324 // There were bad UTF-8 bytes within the prefix, so return a UTF-8 error and
325 // put all the bytes back.
326 (bytes, Err(err))
327 }
328 };
329 // Put invalid bytes back onto the body, if there are any.
330 if !excess_bytes.is_empty() {
331 self.put_back_read_buf();
332 self.writer.get_mut().write_front(&excess_bytes);
333 }
334 result.map(move |string| PrefixString::new(string, self))
335 }
336 }
337 }
338
339 /// The body's length in bytes, if it is known.
340 ///
341 /// If the length is not known, it is likely due to the body arising from an HTTP/1.1 message
342 /// with chunked encoding, an HTTP/2 or later message with no `content-length`, or other
343 /// response-streaming technique.
344 ///
345 /// Note that receiving a length from this function does not guarantee that the full number of
346 /// bytes can actually be read from the body. For example, when proxying a response from a
347 /// backend, this length may reflect the `content-length` promised in the response, but if the
348 /// backend connection is closed prematurely, fewer bytes may be delivered before the body
349 /// can no longer be read.
350 pub fn known_length(&mut self) -> Option<u64> {
351 self.handle().known_length()
352 }
353}
354
355// For these trait implementations we only implement the methods that the underlying buffered
356// adaptors implement; the default implementations for the others will behave the same.
357//
358// The main bit of caution we must use here is that any read should be preceded by flushing the
359// write buffer. `BufWriter` doesn't make any calls if its buffer is empty, so this isn't very
360// expensive and could prevent unexpected results if a program is trying to read and write from the
361// same body.
362impl Read for Body {
363 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
364 self.writer.flush()?;
365 self.reader.read(buf)
366 }
367
368 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
369 self.writer.flush()?;
370 self.reader.read_vectored(bufs)
371 }
372}
373
374impl BufRead for Body {
375 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
376 self.writer.flush()?;
377 self.reader.fill_buf()
378 }
379
380 fn consume(&mut self, amt: usize) {
381 self.reader.consume(amt)
382 }
383}
384
385impl Write for Body {
386 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
387 self.writer.write(buf)
388 }
389
390 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
391 self.writer.write_vectored(bufs)
392 }
393
394 fn flush(&mut self) -> std::io::Result<()> {
395 self.writer.flush()
396 }
397}
398
399impl From<BodyHandle> for Body {
400 fn from(handle: BodyHandle) -> Self {
401 // we clone the handle here in order to have an owned type for the reader and writer, but
402 // this means we have to be careful that we don't make the aliasing observable from the
403 // public interface
404 let handle2 = unsafe { BodyHandle::from_u32(handle.as_u32()) };
405 Self {
406 reader: BufReader::new(BodyHandleWrapper::new(handle)),
407 writer: BufWriter::new(handle2),
408 }
409 }
410}
411
412impl From<&str> for Body {
413 fn from(s: &str) -> Self {
414 BodyHandle::from(s).into()
415 }
416}
417
418impl From<String> for Body {
419 fn from(s: String) -> Self {
420 BodyHandle::from(s).into()
421 }
422}
423
424impl From<&[u8]> for Body {
425 fn from(s: &[u8]) -> Self {
426 BodyHandle::from(s).into()
427 }
428}
429
430impl From<Vec<u8>> for Body {
431 fn from(s: Vec<u8>) -> Self {
432 BodyHandle::from(s).into()
433 }
434}
435
436/// Smart pointer returned by [`Body::get_prefix_mut()`].
437pub struct Prefix<'a> {
438 /// The mutable prefix buffer, if it hasn't yet been taken.
439 ///
440 /// `Prefix` is always created with `Some(buf)`, and [`Prefix::take()`] is the only method that
441 /// changes `self.buf` to `None`. That means the `unwrap`s in the other methods are safe.
442 buf: Option<Vec<u8>>,
443 body: &'a mut Body,
444}
445
446impl std::fmt::Debug for Prefix<'_> {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 f.debug_struct("Prefix")
449 .field("buf", self.buf.as_ref().unwrap())
450 .finish()
451 }
452}
453
454impl<'a> Prefix<'a> {
455 fn new(buf: Vec<u8>, body: &'a mut Body) -> Self {
456 Self {
457 buf: Some(buf),
458 body,
459 }
460 }
461
462 /// Return the prefix as a byte vector without writing it back to the `Body` from which it came.
463 pub fn take(mut self) -> Vec<u8> {
464 self.buf.take().unwrap()
465 }
466}
467
468impl std::ops::Deref for Prefix<'_> {
469 type Target = Vec<u8>;
470
471 fn deref(&self) -> &Self::Target {
472 self.buf.as_ref().unwrap()
473 }
474}
475
476impl std::ops::DerefMut for Prefix<'_> {
477 fn deref_mut(&mut self) -> &mut Self::Target {
478 self.buf.as_mut().unwrap()
479 }
480}
481
482impl Drop for Prefix<'_> {
483 fn drop(&mut self) {
484 // Only put bytes back if `Prefix::take()` has not been called.
485 if let Some(buf) = &self.buf {
486 self.body.put_back_read_buf();
487 self.body.writer.get_mut().write_front(buf);
488 }
489 }
490}
491
492/// Smart pointer returned by [`Body::get_prefix_str_mut()`].
493pub struct PrefixString<'a> {
494 /// The mutable prefix buffer, if it hasn't yet been taken.
495 ///
496 /// `PrefixString` is always created with `Some(buf)`, and [`PrefixString::take()`] is the only
497 /// method that changes `self.buf` to `None`. That means the `unwrap`s in the other methods are
498 /// safe.
499 buf: Option<String>,
500 body: &'a mut Body,
501}
502
503impl std::fmt::Debug for PrefixString<'_> {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct("PrefixString")
506 .field("buf", self.buf.as_ref().unwrap())
507 .finish()
508 }
509}
510
511impl<'a> PrefixString<'a> {
512 fn new(buf: String, body: &'a mut Body) -> Self {
513 Self {
514 buf: Some(buf),
515 body,
516 }
517 }
518
519 /// Return the prefix as a string without writing it back to the `Body` from which it came.
520 pub fn take(mut self) -> String {
521 self.buf.take().unwrap()
522 }
523}
524
525impl std::ops::Deref for PrefixString<'_> {
526 type Target = String;
527
528 fn deref(&self) -> &Self::Target {
529 self.buf.as_ref().unwrap()
530 }
531}
532
533impl std::ops::DerefMut for PrefixString<'_> {
534 fn deref_mut(&mut self) -> &mut Self::Target {
535 self.buf.as_mut().unwrap()
536 }
537}
538
539impl Drop for PrefixString<'_> {
540 fn drop(&mut self) {
541 // Only put bytes back if `PrefixString::take()` has not been called.
542 if let Some(buf) = &self.buf {
543 self.body.put_back_read_buf();
544 self.body.writer.get_mut().write_front(buf.as_bytes());
545 }
546 }
547}
548
549/// An internal wrapper used in `Body` to prevent closing the handle twice by
550/// wrapping a `BodyHandle` in a `ManuallyDrop` to prevent the
551/// `BodyHandle` having it's destructor called. This type should not be used outside
552/// of this module. All the function calls used by `Body` on the writer handle
553/// are reimplemented for the wrapper and it just passes the function call to
554/// the inner handle
555#[repr(transparent)]
556struct BodyHandleWrapper {
557 handle: ManuallyDrop<BodyHandle>,
558}
559
560impl BodyHandleWrapper {
561 fn new(handle: BodyHandle) -> Self {
562 Self {
563 handle: ManuallyDrop::new(handle),
564 }
565 }
566}
567
568impl Write for BodyHandleWrapper {
569 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
570 self.handle.write(buf)
571 }
572
573 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
574 self.handle.write_vectored(bufs)
575 }
576
577 fn flush(&mut self) -> std::io::Result<()> {
578 self.handle.flush()
579 }
580}
581
582impl Read for BodyHandleWrapper {
583 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
584 self.handle.read(buf)
585 }
586}
587
588impl BodyExt for Body {
589 fn append_trailer(&mut self, name: impl ToHeaderName, value: impl ToHeaderValue) {
590 self.handle().append_trailer(
591 name.into_borrowable().as_ref(),
592 value.into_borrowable().as_ref(),
593 );
594 }
595
596 fn get_trailers(&mut self) -> Result<HeaderMap, Error> {
597 let mut trailers = HeaderMap::new();
598 let handle = self.handle();
599
600 for name in handle.get_trailer_names_impl(INITIAL_HEADER_NAME_BUF_SIZE, None)? {
601 let name = name?;
602 for value in
603 handle.get_trailer_values_impl(&name, INITIAL_HEADER_VALUE_BUF_SIZE, None)?
604 {
605 let value = value?;
606 trailers.append(&name, value);
607 }
608 }
609
610 Ok(trailers)
611 }
612}