Skip to main content

specter/
response.rs

1//! HTTP response handling, decompression, and the public poll-based [`Body`].
2
3use crate::error::{Error, Result};
4use crate::headers::Headers;
5use crate::url::Url;
6use bytes::{Bytes, BytesMut};
7use http::StatusCode;
8use http_body::{Body as HttpBody, Frame, SizeHint};
9use std::fmt;
10use std::future::Future;
11use std::io::Read;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15/// Public response body implementing [`http_body::Body`].
16///
17/// The cutover replaced the legacy `mpsc::Receiver<Result<Bytes>>` response
18/// surface with this poll-based body. Buffered responses (returned by
19/// `RequestBuilder::send`) carry their bytes inline and emit them as a single
20/// data frame. H1 streaming responses poll the socket directly; other
21/// transports use their current internal delivery until their poll-body
22/// transport cutovers land.
23///
24/// Cloning a streaming body is rejected at runtime because the transport body
25/// has a single consumer; only [`Body::Empty`]/buffered bodies clone cheaply.
26pub struct Body {
27    inner: BodyInner,
28}
29
30enum BodyInner {
31    Empty,
32    Buffered(Option<Bytes>),
33    H1(crate::transport::h1::H1Body),
34    H2(crate::transport::h2::H2Body),
35    H2Direct(Box<crate::transport::h2::H2DirectBody>),
36    H3(crate::transport::h3::H3Body),
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum BodyCapacityProtocol {
41    Empty,
42    Buffered,
43    H1,
44    H2,
45    H2Direct,
46    H3,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub struct BodyCapacity {
51    pub protocol: BodyCapacityProtocol,
52    pub buffer_capacity: usize,
53    pub buffered_chunks: usize,
54    pub available_slots: usize,
55    pub buffered_bytes: usize,
56    pub closed: bool,
57    pub ended: bool,
58}
59
60impl Body {
61    /// Construct an empty body that completes without yielding any frames.
62    pub fn empty() -> Self {
63        Self {
64            inner: BodyInner::Empty,
65        }
66    }
67
68    /// Construct a buffered body that yields the given bytes once and then
69    /// signals end-of-stream. Cheap to clone and to query for length.
70    pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
71        let bytes = bytes.into();
72        if bytes.is_empty() {
73            Self::empty()
74        } else {
75            Self {
76                inner: BodyInner::Buffered(Some(bytes)),
77            }
78        }
79    }
80
81    /// Wrap an HTTP/1.1 socket-polling response body.
82    pub(crate) fn from_h1(body: crate::transport::h1::H1Body) -> Self {
83        Self {
84            inner: BodyInner::H1(body),
85        }
86    }
87
88    /// Wrap an HTTP/2 wakeable-slot response body.
89    pub(crate) fn from_h2(body: crate::transport::h2::H2Body) -> Self {
90        Self {
91            inner: BodyInner::H2(body),
92        }
93    }
94
95    /// Wrap an HTTP/2 direct-owned response body.
96    pub(crate) fn from_h2_direct(body: crate::transport::h2::H2DirectBody) -> Self {
97        Self {
98            inner: BodyInner::H2Direct(Box::new(body)),
99        }
100    }
101
102    /// Wrap an HTTP/3 wakeable-slot response body.
103    pub(crate) fn from_h3(body: crate::transport::h3::H3Body) -> Self {
104        Self {
105            inner: BodyInner::H3(body),
106        }
107    }
108
109    /// Await HTTP/2 response trailers for this body, if any.
110    ///
111    /// Only H2 streaming bodies can carry trailers, and only when the caller
112    /// requested them (`te: trailers`). Every other body variant returns
113    /// `Ok(None)`. See [`crate::transport::h2::H2Body::trailers`] for the
114    /// three-state contract (clean end and not-requested both map to
115    /// `Ok(None)`; a stream reset maps to `Err`).
116    ///
117    /// Public so language bindings (node/python) can surface gRPC
118    /// `grpc-status`/`grpc-message` trailers from a streaming [`Body`].
119    pub async fn trailers(&mut self) -> Result<Option<Headers>> {
120        match &mut self.inner {
121            BodyInner::H2(body) => body.trailers().await,
122            BodyInner::Empty
123            | BodyInner::Buffered(_)
124            | BodyInner::H1(_)
125            | BodyInner::H2Direct(_)
126            | BodyInner::H3(_) => Ok(None),
127        }
128    }
129
130    /// `true` for an empty buffered body. Streaming bodies report `false`
131    /// because the buffered length is unknown until the body is drained.
132    pub fn is_empty(&self) -> bool {
133        match &self.inner {
134            BodyInner::Empty => true,
135            BodyInner::Buffered(Some(b)) => b.is_empty(),
136            BodyInner::Buffered(None) => true,
137            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
138                false
139            }
140        }
141    }
142
143    /// `true` if the body was created from a streaming transport channel.
144    pub fn is_streaming(&self) -> bool {
145        matches!(
146            self.inner,
147            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_)
148        )
149    }
150
151    /// Return a reference to the buffered bytes when the body is fully
152    /// materialized, or `None` if the body is streaming or already drained.
153    pub fn as_bytes(&self) -> Option<&Bytes> {
154        match &self.inner {
155            BodyInner::Buffered(Some(b)) => Some(b),
156            _ => None,
157        }
158    }
159
160    /// Buffered length when known, `None` for streaming bodies.
161    pub fn buffered_len(&self) -> Option<usize> {
162        match &self.inner {
163            BodyInner::Empty => Some(0),
164            BodyInner::Buffered(Some(b)) => Some(b.len()),
165            BodyInner::Buffered(None) => Some(0),
166            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => None,
167        }
168    }
169
170    /// Snapshot H3 streaming response buffer pressure when this body is backed
171    /// by the native HTTP/3 transport.
172    pub fn h3_capacity(&self) -> Option<crate::transport::h3::H3BodyCapacity> {
173        match &self.inner {
174            BodyInner::H3(body) => Some(body.capacity()),
175            _ => None,
176        }
177    }
178
179    /// Snapshot protocol-neutral response-body buffer pressure.
180    ///
181    /// H2 and native H3 streaming bodies report their actual bounded driver
182    /// queues. H1 and direct-owned H2 bodies stream directly from the socket
183    /// instead of a public queue, so they report zero queued capacity/bytes.
184    /// Buffered and empty bodies report their materialized byte state.
185    pub fn capacity(&self) -> BodyCapacity {
186        match &self.inner {
187            BodyInner::Empty => BodyCapacity {
188                protocol: BodyCapacityProtocol::Empty,
189                buffer_capacity: 0,
190                buffered_chunks: 0,
191                available_slots: 0,
192                buffered_bytes: 0,
193                closed: false,
194                ended: true,
195            },
196            BodyInner::Buffered(bytes) => {
197                let buffered_bytes = bytes.as_ref().map(Bytes::len).unwrap_or(0);
198                BodyCapacity {
199                    protocol: BodyCapacityProtocol::Buffered,
200                    buffer_capacity: usize::from(buffered_bytes > 0),
201                    buffered_chunks: usize::from(buffered_bytes > 0),
202                    available_slots: usize::from(buffered_bytes == 0),
203                    buffered_bytes,
204                    closed: false,
205                    ended: true,
206                }
207            }
208            BodyInner::H1(_) => BodyCapacity {
209                protocol: BodyCapacityProtocol::H1,
210                buffer_capacity: 0,
211                buffered_chunks: 0,
212                available_slots: 0,
213                buffered_bytes: 0,
214                closed: false,
215                ended: false,
216            },
217            BodyInner::H2(body) => {
218                let capacity = body.capacity();
219                BodyCapacity {
220                    protocol: BodyCapacityProtocol::H2,
221                    buffer_capacity: capacity.buffer_capacity,
222                    buffered_chunks: capacity.buffered_chunks,
223                    available_slots: capacity.available_slots,
224                    buffered_bytes: capacity.buffered_bytes,
225                    closed: capacity.closed,
226                    ended: capacity.ended,
227                }
228            }
229            BodyInner::H2Direct(_) => BodyCapacity {
230                protocol: BodyCapacityProtocol::H2Direct,
231                buffer_capacity: 0,
232                buffered_chunks: 0,
233                available_slots: 0,
234                buffered_bytes: 0,
235                closed: false,
236                ended: false,
237            },
238            BodyInner::H3(body) => {
239                let capacity = body.capacity();
240                BodyCapacity {
241                    protocol: BodyCapacityProtocol::H3,
242                    buffer_capacity: capacity.buffer_capacity,
243                    buffered_chunks: capacity.buffered_chunks,
244                    available_slots: capacity.available_slots,
245                    buffered_bytes: capacity.buffered_bytes,
246                    closed: capacity.closed,
247                    ended: capacity.ended,
248                }
249            }
250        }
251    }
252
253    /// Convenience accessor for buffered bodies. Returns `0` for streaming
254    /// bodies; callers wanting to detect streaming should use
255    /// [`Body::buffered_len`] or [`Body::is_streaming`].
256    pub fn len(&self) -> usize {
257        self.buffered_len().unwrap_or(0)
258    }
259
260    /// Poll the next frame asynchronously. Returns `None` after end-of-stream.
261    pub fn frame(&mut self) -> FrameFuture<'_> {
262        FrameFuture { body: self }
263    }
264
265    /// Poll the next data chunk asynchronously. Returns `None` after end-of-stream.
266    #[inline(always)]
267    pub fn chunk(&mut self) -> ChunkFuture<'_> {
268        ChunkFuture { body: self }
269    }
270
271    /// Drain the body into a contiguous [`Bytes`] buffer.
272    ///
273    /// For buffered bodies this is essentially a clone of the underlying
274    /// bytes. For streaming bodies it polls the body to completion, so callers
275    /// must opt in explicitly.
276    pub async fn collect_to_bytes(&mut self) -> Result<Bytes> {
277        let mut buf = BytesMut::new();
278        while let Some(frame) = self.frame().await {
279            let frame = frame?;
280            if let Ok(data) = frame.into_data() {
281                buf.extend_from_slice(&data);
282            }
283        }
284        Ok(buf.freeze())
285    }
286}
287
288impl Default for Body {
289    fn default() -> Self {
290        Self::empty()
291    }
292}
293
294impl fmt::Debug for Body {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        match &self.inner {
297            BodyInner::Empty => f.debug_struct("Body::Empty").finish(),
298            BodyInner::Buffered(Some(b)) => f
299                .debug_struct("Body::Buffered")
300                .field("len", &b.len())
301                .finish(),
302            BodyInner::Buffered(None) => f.debug_struct("Body::Buffered").field("len", &0).finish(),
303            BodyInner::H1(_) => f.debug_struct("Body::H1Streaming").finish(),
304            BodyInner::H2(_) => f.debug_struct("Body::H2Streaming").finish(),
305            BodyInner::H2Direct(_) => f.debug_struct("Body::H2DirectStreaming").finish(),
306            BodyInner::H3(_) => f.debug_struct("Body::H3Streaming").finish(),
307        }
308    }
309}
310
311impl Clone for Body {
312    fn clone(&self) -> Self {
313        match &self.inner {
314            BodyInner::Empty => Self::empty(),
315            BodyInner::Buffered(Some(b)) => Self {
316                inner: BodyInner::Buffered(Some(b.clone())),
317            },
318            BodyInner::Buffered(None) => Self {
319                inner: BodyInner::Buffered(None),
320            },
321            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
322                panic!("specter::Body::clone is not supported for streaming bodies")
323            }
324        }
325    }
326}
327
328impl From<Bytes> for Body {
329    fn from(value: Bytes) -> Self {
330        Self::from_bytes(value)
331    }
332}
333
334impl HttpBody for Body {
335    type Data = Bytes;
336    type Error = Error;
337
338    fn poll_frame(
339        mut self: Pin<&mut Self>,
340        cx: &mut Context<'_>,
341    ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
342        match &mut self.inner {
343            BodyInner::Empty => Poll::Ready(None),
344            BodyInner::Buffered(slot) => match slot.take() {
345                Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(Frame::data(bytes)))),
346                _ => Poll::Ready(None),
347            },
348            BodyInner::H1(body) => Pin::new(body).poll_frame(cx),
349            BodyInner::H2(body) => Pin::new(body).poll_frame(cx),
350            BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_frame(cx),
351            BodyInner::H3(body) => Pin::new(body).poll_frame(cx),
352        }
353    }
354
355    fn is_end_stream(&self) -> bool {
356        match &self.inner {
357            BodyInner::Empty => true,
358            BodyInner::Buffered(None) => true,
359            BodyInner::Buffered(Some(b)) => b.is_empty(),
360            BodyInner::H1(body) => body.is_terminal(),
361            BodyInner::H2(body) => body.is_terminal(),
362            BodyInner::H2Direct(body) => body.is_terminal(),
363            BodyInner::H3(body) => body.is_terminal(),
364        }
365    }
366
367    fn size_hint(&self) -> SizeHint {
368        match &self.inner {
369            BodyInner::Empty => SizeHint::with_exact(0),
370            BodyInner::Buffered(Some(b)) => SizeHint::with_exact(b.len() as u64),
371            BodyInner::Buffered(None) => SizeHint::with_exact(0),
372            BodyInner::H1(body) => body.size_hint(),
373            BodyInner::H2(body) => body.size_hint(),
374            BodyInner::H2Direct(body) => body.size_hint(),
375            BodyInner::H3(body) => body.size_hint(),
376        }
377    }
378}
379
380impl Body {
381    #[inline(always)]
382    fn poll_chunk(
383        mut self: Pin<&mut Self>,
384        cx: &mut Context<'_>,
385    ) -> Poll<Option<std::result::Result<Bytes, Error>>> {
386        match &mut self.inner {
387            BodyInner::Empty => Poll::Ready(None),
388            BodyInner::Buffered(slot) => match slot.take() {
389                Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(bytes))),
390                _ => Poll::Ready(None),
391            },
392            BodyInner::H2(body) => Pin::new(body).poll_data_coalesced(cx),
393            BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_data(cx),
394            BodyInner::H1(body) => match Pin::new(body).poll_frame(cx) {
395                Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
396                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
397                    Err(_) => Poll::Pending,
398                },
399                Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
400                Poll::Ready(None) => Poll::Ready(None),
401                Poll::Pending => Poll::Pending,
402            },
403            BodyInner::H3(body) => match Pin::new(body).poll_frame(cx) {
404                Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
405                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
406                    Err(_) => Poll::Pending,
407                },
408                Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
409                Poll::Ready(None) => Poll::Ready(None),
410                Poll::Pending => Poll::Pending,
411            },
412        }
413    }
414}
415
416/// Future returned by [`Body::frame`].
417pub struct FrameFuture<'a> {
418    body: &'a mut Body,
419}
420
421impl<'a> Future for FrameFuture<'a> {
422    type Output = Option<std::result::Result<Frame<Bytes>, Error>>;
423
424    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425        let body = &mut *self.get_mut().body;
426        match Pin::new(body).poll_frame(cx) {
427            Poll::Pending => Poll::Pending,
428            Poll::Ready(value) => Poll::Ready(value),
429        }
430    }
431}
432
433/// Future returned by [`Body::chunk`].
434pub struct ChunkFuture<'a> {
435    body: &'a mut Body,
436}
437
438impl<'a> Future for ChunkFuture<'a> {
439    type Output = Option<std::result::Result<Bytes, Error>>;
440
441    #[inline(always)]
442    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
443        let body = &mut *self.get_mut().body;
444        Pin::new(body).poll_chunk(cx)
445    }
446}
447
448/// HTTP response with explicit decompression and a poll-based [`Body`].
449#[derive(Debug, Clone)]
450pub struct Response {
451    pub(crate) status: u16,
452    headers: Headers,
453    body: Body,
454    http_version: String,
455    effective_url: Option<Url>,
456}
457
458impl Response {
459    /// Construct a buffered response. Used by the non-streaming transport
460    /// paths and by tests/cache code that already have the full body in
461    /// memory.
462    pub fn new(status: u16, headers: Headers, body: Bytes, http_version: String) -> Self {
463        Self {
464            status,
465            headers,
466            body: Body::from_bytes(body),
467            http_version,
468            effective_url: None,
469        }
470    }
471
472    /// Construct a response that wraps an explicit [`Body`]. Used by the
473    /// streaming transport paths to publish the poll-based body to callers.
474    pub fn with_body(status: u16, headers: Headers, body: Body, http_version: String) -> Self {
475        Self {
476            status,
477            headers,
478            body,
479            http_version,
480            effective_url: None,
481        }
482    }
483
484    pub(crate) fn into_status_headers_version(self) -> (u16, Headers, String) {
485        (self.status, self.headers, self.http_version)
486    }
487
488    /// Set the effective URL (the URL that was actually requested).
489    pub fn with_url(mut self, url: Url) -> Self {
490        self.effective_url = Some(url);
491        self
492    }
493
494    pub(crate) async fn into_buffered(mut self) -> Result<Self> {
495        if self.body.is_streaming() {
496            let bytes = self.body.collect_to_bytes().await?;
497            self.body = Body::from_bytes(bytes);
498        }
499        Ok(self)
500    }
501
502    pub fn http_version(&self) -> &str {
503        &self.http_version
504    }
505
506    pub fn status(&self) -> StatusCode {
507        StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
508    }
509
510    pub fn status_code(&self) -> u16 {
511        self.status
512    }
513
514    pub fn headers(&self) -> &Headers {
515        &self.headers
516    }
517
518    /// Await the HTTP/2 response trailers for this response, if any.
519    ///
520    /// gRPC carries `grpc-status`/`grpc-message` in a trailing HEADERS frame;
521    /// this surfaces them. Three outcomes:
522    /// - `Ok(Some(headers))` - a real trailers frame arrived.
523    /// - `Ok(None)` - a clean trailer-less end, trailers were not requested
524    ///   (`te: trailers` absent), or the response is buffered / non-H2.
525    /// - `Err(_)` - the stream was reset (RST_STREAM / transport error) before
526    ///   a clean end; distinct from `Ok(None)`.
527    ///
528    /// Await this only after the body stream has returned end: a resolved
529    /// trailer channel does not imply the body has been fully drained.
530    pub async fn trailers(&mut self) -> Result<Option<Headers>> {
531        self.body.trailers().await
532    }
533
534    pub fn url(&self) -> Option<&Url> {
535        self.effective_url.as_ref()
536    }
537
538    /// Reference to the public poll-based body.
539    pub fn body(&self) -> &Body {
540        &self.body
541    }
542
543    /// Mutable reference to the public poll-based body, used to drive
544    /// [`Body::frame`] without consuming the response.
545    pub fn body_mut(&mut self) -> &mut Body {
546        &mut self.body
547    }
548
549    /// Consume the response and return the body for poll-based draining.
550    pub fn into_body(self) -> Body {
551        self.body
552    }
553
554    /// Borrow the buffered body bytes, when the body is fully materialized.
555    /// Returns `None` for streaming bodies; use [`Body::frame`] or
556    /// [`Body::collect_to_bytes`] in that case.
557    pub fn buffered_bytes(&self) -> Option<&Bytes> {
558        self.body.as_bytes()
559    }
560
561    pub fn bytes_raw(&self) -> Result<Bytes> {
562        self.body
563            .as_bytes()
564            .cloned()
565            .ok_or_else(|| Error::HttpProtocol("response body is streaming, not buffered".into()))
566    }
567
568    pub fn bytes(&self) -> Result<Bytes> {
569        self.decoded_body()
570    }
571
572    pub fn is_success(&self) -> bool {
573        (200..300).contains(&self.status)
574    }
575    pub fn is_redirect(&self) -> bool {
576        (300..400).contains(&self.status)
577    }
578    pub fn redirect_url(&self) -> Option<&str> {
579        self.get_header("Location")
580    }
581
582    pub fn get_header(&self, name: &str) -> Option<&str> {
583        self.headers.get(name)
584    }
585
586    pub fn get_headers(&self, name: &str) -> Vec<&str> {
587        self.headers.get_all(name)
588    }
589
590    pub fn content_type(&self) -> Option<&str> {
591        self.get_header("Content-Type")
592    }
593    pub fn content_encoding(&self) -> Option<&str> {
594        self.get_header("Content-Encoding")
595    }
596
597    /// Decode body based on Content-Encoding (gzip, deflate, br, zstd).
598    /// Supports chained encodings (e.g., "gzip, deflate") by applying decodings in reverse order.
599    /// Returns an error for streaming bodies; the caller must consume the
600    /// streaming body via [`Body::frame`] before applying decompression.
601    pub fn decoded_body(&self) -> Result<Bytes> {
602        let body = self.body.as_bytes().ok_or_else(|| {
603            Error::HttpProtocol("response body is streaming, not buffered".into())
604        })?;
605
606        let encodings: Vec<&str> = self
607            .content_encoding()
608            .map(|s| s.split(',').map(str::trim).collect())
609            .unwrap_or_default();
610
611        if !encodings.is_empty() {
612            let mut data = body.clone();
613            for encoding in encodings.iter().rev() {
614                data = match encoding.to_lowercase().as_str() {
615                    "gzip" | "x-gzip" => decode_gzip(&data)?,
616                    "deflate" => decode_deflate(&data)?,
617                    "br" => decode_brotli(&data)?,
618                    "zstd" => decode_zstd(&data)?,
619                    "identity" => data,
620                    _ => data,
621                };
622            }
623            return Ok(data);
624        }
625
626        if body.len() >= 4
627            && body[0] == 0x28
628            && body[1] == 0xB5
629            && body[2] == 0x2F
630            && body[3] == 0xFD
631        {
632            return decode_zstd(body);
633        }
634        if body.len() >= 2 && body[0] == 0x1f && body[1] == 0x8b {
635            return decode_gzip(body);
636        }
637
638        Ok(body.clone())
639    }
640
641    pub fn text(&self) -> Result<String> {
642        let decoded = self.decoded_body()?;
643        String::from_utf8(decoded.to_vec())
644            .map_err(|e| Error::Decompression(format!("UTF-8 decode error: {}", e)))
645    }
646
647    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
648        let text = self.text()?;
649        serde_json::from_str(&text).map_err(Error::from)
650    }
651
652    pub fn error_for_status(self) -> Result<Self> {
653        if self.status().is_client_error() || self.status().is_server_error() {
654            let message = self
655                .status()
656                .canonical_reason()
657                .unwrap_or("HTTP error")
658                .to_string();
659            Err(Error::http_status(self.status, message))
660        } else {
661            Ok(self)
662        }
663    }
664
665    pub fn error_for_status_ref(&self) -> Result<&Self> {
666        if self.status().is_client_error() || self.status().is_server_error() {
667            let message = self
668                .status()
669                .canonical_reason()
670                .unwrap_or("HTTP error")
671                .to_string();
672            Err(Error::http_status(self.status, message))
673        } else {
674            Ok(self)
675        }
676    }
677}
678
679fn decode_gzip(data: &[u8]) -> Result<Bytes> {
680    let mut decoder = flate2::read::GzDecoder::new(data);
681    let mut decoded = Vec::new();
682    decoder
683        .read_to_end(&mut decoded)
684        .map_err(|e| Error::Decompression(format!("gzip: {}", e)))?;
685    Ok(Bytes::from(decoded))
686}
687
688fn decode_deflate(data: &[u8]) -> Result<Bytes> {
689    let mut decoded = Vec::new();
690    if flate2::read::ZlibDecoder::new(data)
691        .read_to_end(&mut decoded)
692        .is_ok()
693    {
694        return Ok(Bytes::from(decoded));
695    }
696    decoded.clear();
697    flate2::read::DeflateDecoder::new(data)
698        .read_to_end(&mut decoded)
699        .map_err(|e| Error::Decompression(format!("deflate: {}", e)))?;
700    Ok(Bytes::from(decoded))
701}
702
703fn decode_brotli(data: &[u8]) -> Result<Bytes> {
704    let mut decoder = brotli::Decompressor::new(data, 4096);
705    let mut decoded = Vec::new();
706    decoder
707        .read_to_end(&mut decoded)
708        .map_err(|e| Error::Decompression(format!("brotli: {}", e)))?;
709    Ok(Bytes::from(decoded))
710}
711
712fn decode_zstd(data: &[u8]) -> Result<Bytes> {
713    zstd::stream::decode_all(data)
714        .map(Bytes::from)
715        .map_err(|e| Error::Decompression(format!("zstd: {}", e)))
716}