Skip to main content

specter/
response.rs

1//! HTTP response handling, decompression, and the public poll-based [`Body`].
2
3use crate::error::{Error, Result};
4use crate::headers::Headers;
5use crate::url::Url;
6use bytes::{Bytes, BytesMut};
7use http::StatusCode;
8use http_body::{Body as HttpBody, Frame, SizeHint};
9use std::fmt;
10use std::future::Future;
11use std::io::Read;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15/// Public response body implementing [`http_body::Body`].
16///
17/// The cutover replaced the legacy `mpsc::Receiver<Result<Bytes>>` response
18/// surface with this poll-based body. Buffered responses (returned by
19/// `RequestBuilder::send`) carry their bytes inline and emit them as a single
20/// data frame. H1 streaming responses poll the socket directly; other
21/// transports use their current internal delivery until their poll-body
22/// transport cutovers land.
23///
24/// Cloning a streaming body is rejected at runtime because the transport body
25/// has a single consumer; only [`Body::Empty`]/buffered bodies clone cheaply.
26pub struct Body {
27    inner: BodyInner,
28}
29
30enum BodyInner {
31    Empty,
32    Buffered(Option<Bytes>),
33    H1(crate::transport::h1::H1Body),
34    H2(crate::transport::h2::H2Body),
35    H2Direct(Box<crate::transport::h2::H2DirectBody>),
36    H3(crate::transport::h3::H3Body),
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum BodyCapacityProtocol {
41    Empty,
42    Buffered,
43    H1,
44    H2,
45    H2Direct,
46    H3,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub struct BodyCapacity {
51    pub protocol: BodyCapacityProtocol,
52    pub buffer_capacity: usize,
53    pub buffered_chunks: usize,
54    pub available_slots: usize,
55    pub buffered_bytes: usize,
56    pub closed: bool,
57    pub ended: bool,
58}
59
60impl Body {
61    /// Construct an empty body that completes without yielding any frames.
62    pub fn empty() -> Self {
63        Self {
64            inner: BodyInner::Empty,
65        }
66    }
67
68    /// Construct a buffered body that yields the given bytes once and then
69    /// signals end-of-stream. Cheap to clone and to query for length.
70    pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
71        let bytes = bytes.into();
72        if bytes.is_empty() {
73            Self::empty()
74        } else {
75            Self {
76                inner: BodyInner::Buffered(Some(bytes)),
77            }
78        }
79    }
80
81    /// Wrap an HTTP/1.1 socket-polling response body.
82    pub(crate) fn from_h1(body: crate::transport::h1::H1Body) -> Self {
83        Self {
84            inner: BodyInner::H1(body),
85        }
86    }
87
88    /// Wrap an HTTP/2 wakeable-slot response body.
89    pub(crate) fn from_h2(body: crate::transport::h2::H2Body) -> Self {
90        Self {
91            inner: BodyInner::H2(body),
92        }
93    }
94
95    /// Wrap an HTTP/2 direct-owned response body.
96    pub(crate) fn from_h2_direct(body: crate::transport::h2::H2DirectBody) -> Self {
97        Self {
98            inner: BodyInner::H2Direct(Box::new(body)),
99        }
100    }
101
102    /// Wrap an HTTP/3 wakeable-slot response body.
103    pub(crate) fn from_h3(body: crate::transport::h3::H3Body) -> Self {
104        Self {
105            inner: BodyInner::H3(body),
106        }
107    }
108
109    /// `true` for an empty buffered body. Streaming bodies report `false`
110    /// because the buffered length is unknown until the body is drained.
111    pub fn is_empty(&self) -> bool {
112        match &self.inner {
113            BodyInner::Empty => true,
114            BodyInner::Buffered(Some(b)) => b.is_empty(),
115            BodyInner::Buffered(None) => true,
116            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
117                false
118            }
119        }
120    }
121
122    /// `true` if the body was created from a streaming transport channel.
123    pub fn is_streaming(&self) -> bool {
124        matches!(
125            self.inner,
126            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_)
127        )
128    }
129
130    /// Return a reference to the buffered bytes when the body is fully
131    /// materialized, or `None` if the body is streaming or already drained.
132    pub fn as_bytes(&self) -> Option<&Bytes> {
133        match &self.inner {
134            BodyInner::Buffered(Some(b)) => Some(b),
135            _ => None,
136        }
137    }
138
139    /// Buffered length when known, `None` for streaming bodies.
140    pub fn buffered_len(&self) -> Option<usize> {
141        match &self.inner {
142            BodyInner::Empty => Some(0),
143            BodyInner::Buffered(Some(b)) => Some(b.len()),
144            BodyInner::Buffered(None) => Some(0),
145            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => None,
146        }
147    }
148
149    /// Snapshot H3 streaming response buffer pressure when this body is backed
150    /// by the native HTTP/3 transport.
151    pub fn h3_capacity(&self) -> Option<crate::transport::h3::H3BodyCapacity> {
152        match &self.inner {
153            BodyInner::H3(body) => Some(body.capacity()),
154            _ => None,
155        }
156    }
157
158    /// Snapshot protocol-neutral response-body buffer pressure.
159    ///
160    /// H2 and native H3 streaming bodies report their actual bounded driver
161    /// queues. H1 and direct-owned H2 bodies stream directly from the socket
162    /// instead of a public queue, so they report zero queued capacity/bytes.
163    /// Buffered and empty bodies report their materialized byte state.
164    pub fn capacity(&self) -> BodyCapacity {
165        match &self.inner {
166            BodyInner::Empty => BodyCapacity {
167                protocol: BodyCapacityProtocol::Empty,
168                buffer_capacity: 0,
169                buffered_chunks: 0,
170                available_slots: 0,
171                buffered_bytes: 0,
172                closed: false,
173                ended: true,
174            },
175            BodyInner::Buffered(bytes) => {
176                let buffered_bytes = bytes.as_ref().map(Bytes::len).unwrap_or(0);
177                BodyCapacity {
178                    protocol: BodyCapacityProtocol::Buffered,
179                    buffer_capacity: usize::from(buffered_bytes > 0),
180                    buffered_chunks: usize::from(buffered_bytes > 0),
181                    available_slots: usize::from(buffered_bytes == 0),
182                    buffered_bytes,
183                    closed: false,
184                    ended: true,
185                }
186            }
187            BodyInner::H1(_) => BodyCapacity {
188                protocol: BodyCapacityProtocol::H1,
189                buffer_capacity: 0,
190                buffered_chunks: 0,
191                available_slots: 0,
192                buffered_bytes: 0,
193                closed: false,
194                ended: false,
195            },
196            BodyInner::H2(body) => {
197                let capacity = body.capacity();
198                BodyCapacity {
199                    protocol: BodyCapacityProtocol::H2,
200                    buffer_capacity: capacity.buffer_capacity,
201                    buffered_chunks: capacity.buffered_chunks,
202                    available_slots: capacity.available_slots,
203                    buffered_bytes: capacity.buffered_bytes,
204                    closed: capacity.closed,
205                    ended: capacity.ended,
206                }
207            }
208            BodyInner::H2Direct(_) => BodyCapacity {
209                protocol: BodyCapacityProtocol::H2Direct,
210                buffer_capacity: 0,
211                buffered_chunks: 0,
212                available_slots: 0,
213                buffered_bytes: 0,
214                closed: false,
215                ended: false,
216            },
217            BodyInner::H3(body) => {
218                let capacity = body.capacity();
219                BodyCapacity {
220                    protocol: BodyCapacityProtocol::H3,
221                    buffer_capacity: capacity.buffer_capacity,
222                    buffered_chunks: capacity.buffered_chunks,
223                    available_slots: capacity.available_slots,
224                    buffered_bytes: capacity.buffered_bytes,
225                    closed: capacity.closed,
226                    ended: capacity.ended,
227                }
228            }
229        }
230    }
231
232    /// Convenience accessor for buffered bodies. Returns `0` for streaming
233    /// bodies; callers wanting to detect streaming should use
234    /// [`Body::buffered_len`] or [`Body::is_streaming`].
235    pub fn len(&self) -> usize {
236        self.buffered_len().unwrap_or(0)
237    }
238
239    /// Poll the next frame asynchronously. Returns `None` after end-of-stream.
240    pub fn frame(&mut self) -> FrameFuture<'_> {
241        FrameFuture { body: self }
242    }
243
244    /// Poll the next data chunk asynchronously. Returns `None` after end-of-stream.
245    #[inline(always)]
246    pub fn chunk(&mut self) -> ChunkFuture<'_> {
247        ChunkFuture { body: self }
248    }
249
250    /// Drain the body into a contiguous [`Bytes`] buffer.
251    ///
252    /// For buffered bodies this is essentially a clone of the underlying
253    /// bytes. For streaming bodies it polls the body to completion, so callers
254    /// must opt in explicitly.
255    pub async fn collect_to_bytes(&mut self) -> Result<Bytes> {
256        let mut buf = BytesMut::new();
257        while let Some(frame) = self.frame().await {
258            let frame = frame?;
259            if let Ok(data) = frame.into_data() {
260                buf.extend_from_slice(&data);
261            }
262        }
263        Ok(buf.freeze())
264    }
265}
266
267impl Default for Body {
268    fn default() -> Self {
269        Self::empty()
270    }
271}
272
273impl fmt::Debug for Body {
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        match &self.inner {
276            BodyInner::Empty => f.debug_struct("Body::Empty").finish(),
277            BodyInner::Buffered(Some(b)) => f
278                .debug_struct("Body::Buffered")
279                .field("len", &b.len())
280                .finish(),
281            BodyInner::Buffered(None) => f.debug_struct("Body::Buffered").field("len", &0).finish(),
282            BodyInner::H1(_) => f.debug_struct("Body::H1Streaming").finish(),
283            BodyInner::H2(_) => f.debug_struct("Body::H2Streaming").finish(),
284            BodyInner::H2Direct(_) => f.debug_struct("Body::H2DirectStreaming").finish(),
285            BodyInner::H3(_) => f.debug_struct("Body::H3Streaming").finish(),
286        }
287    }
288}
289
290impl Clone for Body {
291    fn clone(&self) -> Self {
292        match &self.inner {
293            BodyInner::Empty => Self::empty(),
294            BodyInner::Buffered(Some(b)) => Self {
295                inner: BodyInner::Buffered(Some(b.clone())),
296            },
297            BodyInner::Buffered(None) => Self {
298                inner: BodyInner::Buffered(None),
299            },
300            BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
301                panic!("specter::Body::clone is not supported for streaming bodies")
302            }
303        }
304    }
305}
306
307impl From<Bytes> for Body {
308    fn from(value: Bytes) -> Self {
309        Self::from_bytes(value)
310    }
311}
312
313impl HttpBody for Body {
314    type Data = Bytes;
315    type Error = Error;
316
317    fn poll_frame(
318        mut self: Pin<&mut Self>,
319        cx: &mut Context<'_>,
320    ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
321        match &mut self.inner {
322            BodyInner::Empty => Poll::Ready(None),
323            BodyInner::Buffered(slot) => match slot.take() {
324                Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(Frame::data(bytes)))),
325                _ => Poll::Ready(None),
326            },
327            BodyInner::H1(body) => Pin::new(body).poll_frame(cx),
328            BodyInner::H2(body) => Pin::new(body).poll_frame(cx),
329            BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_frame(cx),
330            BodyInner::H3(body) => Pin::new(body).poll_frame(cx),
331        }
332    }
333
334    fn is_end_stream(&self) -> bool {
335        match &self.inner {
336            BodyInner::Empty => true,
337            BodyInner::Buffered(None) => true,
338            BodyInner::Buffered(Some(b)) => b.is_empty(),
339            BodyInner::H1(body) => body.is_terminal(),
340            BodyInner::H2(body) => body.is_terminal(),
341            BodyInner::H2Direct(body) => body.is_terminal(),
342            BodyInner::H3(body) => body.is_terminal(),
343        }
344    }
345
346    fn size_hint(&self) -> SizeHint {
347        match &self.inner {
348            BodyInner::Empty => SizeHint::with_exact(0),
349            BodyInner::Buffered(Some(b)) => SizeHint::with_exact(b.len() as u64),
350            BodyInner::Buffered(None) => SizeHint::with_exact(0),
351            BodyInner::H1(body) => body.size_hint(),
352            BodyInner::H2(body) => body.size_hint(),
353            BodyInner::H2Direct(body) => body.size_hint(),
354            BodyInner::H3(body) => body.size_hint(),
355        }
356    }
357}
358
359impl Body {
360    #[inline(always)]
361    fn poll_chunk(
362        mut self: Pin<&mut Self>,
363        cx: &mut Context<'_>,
364    ) -> Poll<Option<std::result::Result<Bytes, Error>>> {
365        match &mut self.inner {
366            BodyInner::Empty => Poll::Ready(None),
367            BodyInner::Buffered(slot) => match slot.take() {
368                Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(bytes))),
369                _ => Poll::Ready(None),
370            },
371            BodyInner::H2(body) => Pin::new(body).poll_data_coalesced(cx),
372            BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_data(cx),
373            BodyInner::H1(body) => match Pin::new(body).poll_frame(cx) {
374                Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
375                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
376                    Err(_) => Poll::Pending,
377                },
378                Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
379                Poll::Ready(None) => Poll::Ready(None),
380                Poll::Pending => Poll::Pending,
381            },
382            BodyInner::H3(body) => match Pin::new(body).poll_frame(cx) {
383                Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
384                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
385                    Err(_) => Poll::Pending,
386                },
387                Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
388                Poll::Ready(None) => Poll::Ready(None),
389                Poll::Pending => Poll::Pending,
390            },
391        }
392    }
393}
394
395/// Future returned by [`Body::frame`].
396pub struct FrameFuture<'a> {
397    body: &'a mut Body,
398}
399
400impl<'a> Future for FrameFuture<'a> {
401    type Output = Option<std::result::Result<Frame<Bytes>, Error>>;
402
403    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
404        let body = &mut *self.get_mut().body;
405        match Pin::new(body).poll_frame(cx) {
406            Poll::Pending => Poll::Pending,
407            Poll::Ready(value) => Poll::Ready(value),
408        }
409    }
410}
411
412/// Future returned by [`Body::chunk`].
413pub struct ChunkFuture<'a> {
414    body: &'a mut Body,
415}
416
417impl<'a> Future for ChunkFuture<'a> {
418    type Output = Option<std::result::Result<Bytes, Error>>;
419
420    #[inline(always)]
421    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422        let body = &mut *self.get_mut().body;
423        Pin::new(body).poll_chunk(cx)
424    }
425}
426
427/// HTTP response with explicit decompression and a poll-based [`Body`].
428#[derive(Debug, Clone)]
429pub struct Response {
430    pub(crate) status: u16,
431    headers: Headers,
432    body: Body,
433    http_version: String,
434    effective_url: Option<Url>,
435}
436
437impl Response {
438    /// Construct a buffered response. Used by the non-streaming transport
439    /// paths and by tests/cache code that already have the full body in
440    /// memory.
441    pub fn new(status: u16, headers: Headers, body: Bytes, http_version: String) -> Self {
442        Self {
443            status,
444            headers,
445            body: Body::from_bytes(body),
446            http_version,
447            effective_url: None,
448        }
449    }
450
451    /// Construct a response that wraps an explicit [`Body`]. Used by the
452    /// streaming transport paths to publish the poll-based body to callers.
453    pub fn with_body(status: u16, headers: Headers, body: Body, http_version: String) -> Self {
454        Self {
455            status,
456            headers,
457            body,
458            http_version,
459            effective_url: None,
460        }
461    }
462
463    pub(crate) fn into_status_headers_version(self) -> (u16, Headers, String) {
464        (self.status, self.headers, self.http_version)
465    }
466
467    /// Set the effective URL (the URL that was actually requested).
468    pub fn with_url(mut self, url: Url) -> Self {
469        self.effective_url = Some(url);
470        self
471    }
472
473    pub(crate) async fn into_buffered(mut self) -> Result<Self> {
474        if self.body.is_streaming() {
475            let bytes = self.body.collect_to_bytes().await?;
476            self.body = Body::from_bytes(bytes);
477        }
478        Ok(self)
479    }
480
481    pub fn http_version(&self) -> &str {
482        &self.http_version
483    }
484
485    pub fn status(&self) -> StatusCode {
486        StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
487    }
488
489    pub fn status_code(&self) -> u16 {
490        self.status
491    }
492
493    pub fn headers(&self) -> &Headers {
494        &self.headers
495    }
496
497    pub fn url(&self) -> Option<&Url> {
498        self.effective_url.as_ref()
499    }
500
501    /// Reference to the public poll-based body.
502    pub fn body(&self) -> &Body {
503        &self.body
504    }
505
506    /// Mutable reference to the public poll-based body, used to drive
507    /// [`Body::frame`] without consuming the response.
508    pub fn body_mut(&mut self) -> &mut Body {
509        &mut self.body
510    }
511
512    /// Consume the response and return the body for poll-based draining.
513    pub fn into_body(self) -> Body {
514        self.body
515    }
516
517    /// Borrow the buffered body bytes, when the body is fully materialized.
518    /// Returns `None` for streaming bodies; use [`Body::frame`] or
519    /// [`Body::collect_to_bytes`] in that case.
520    pub fn buffered_bytes(&self) -> Option<&Bytes> {
521        self.body.as_bytes()
522    }
523
524    pub fn bytes_raw(&self) -> Result<Bytes> {
525        self.body
526            .as_bytes()
527            .cloned()
528            .ok_or_else(|| Error::HttpProtocol("response body is streaming, not buffered".into()))
529    }
530
531    pub fn bytes(&self) -> Result<Bytes> {
532        self.decoded_body()
533    }
534
535    pub fn is_success(&self) -> bool {
536        (200..300).contains(&self.status)
537    }
538    pub fn is_redirect(&self) -> bool {
539        (300..400).contains(&self.status)
540    }
541    pub fn redirect_url(&self) -> Option<&str> {
542        self.get_header("Location")
543    }
544
545    pub fn get_header(&self, name: &str) -> Option<&str> {
546        self.headers.get(name)
547    }
548
549    pub fn get_headers(&self, name: &str) -> Vec<&str> {
550        self.headers.get_all(name)
551    }
552
553    pub fn content_type(&self) -> Option<&str> {
554        self.get_header("Content-Type")
555    }
556    pub fn content_encoding(&self) -> Option<&str> {
557        self.get_header("Content-Encoding")
558    }
559
560    /// Decode body based on Content-Encoding (gzip, deflate, br, zstd).
561    /// Supports chained encodings (e.g., "gzip, deflate") by applying decodings in reverse order.
562    /// Returns an error for streaming bodies; the caller must consume the
563    /// streaming body via [`Body::frame`] before applying decompression.
564    pub fn decoded_body(&self) -> Result<Bytes> {
565        let body = self.body.as_bytes().ok_or_else(|| {
566            Error::HttpProtocol("response body is streaming, not buffered".into())
567        })?;
568
569        let encodings: Vec<&str> = self
570            .content_encoding()
571            .map(|s| s.split(',').map(str::trim).collect())
572            .unwrap_or_default();
573
574        if !encodings.is_empty() {
575            let mut data = body.clone();
576            for encoding in encodings.iter().rev() {
577                data = match encoding.to_lowercase().as_str() {
578                    "gzip" | "x-gzip" => decode_gzip(&data)?,
579                    "deflate" => decode_deflate(&data)?,
580                    "br" => decode_brotli(&data)?,
581                    "zstd" => decode_zstd(&data)?,
582                    "identity" => data,
583                    _ => data,
584                };
585            }
586            return Ok(data);
587        }
588
589        if body.len() >= 4
590            && body[0] == 0x28
591            && body[1] == 0xB5
592            && body[2] == 0x2F
593            && body[3] == 0xFD
594        {
595            return decode_zstd(body);
596        }
597        if body.len() >= 2 && body[0] == 0x1f && body[1] == 0x8b {
598            return decode_gzip(body);
599        }
600
601        Ok(body.clone())
602    }
603
604    pub fn text(&self) -> Result<String> {
605        let decoded = self.decoded_body()?;
606        String::from_utf8(decoded.to_vec())
607            .map_err(|e| Error::Decompression(format!("UTF-8 decode error: {}", e)))
608    }
609
610    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
611        let text = self.text()?;
612        serde_json::from_str(&text).map_err(Error::from)
613    }
614
615    pub fn error_for_status(self) -> Result<Self> {
616        if self.status().is_client_error() || self.status().is_server_error() {
617            let message = self
618                .status()
619                .canonical_reason()
620                .unwrap_or("HTTP error")
621                .to_string();
622            Err(Error::http_status(self.status, message))
623        } else {
624            Ok(self)
625        }
626    }
627
628    pub fn error_for_status_ref(&self) -> Result<&Self> {
629        if self.status().is_client_error() || self.status().is_server_error() {
630            let message = self
631                .status()
632                .canonical_reason()
633                .unwrap_or("HTTP error")
634                .to_string();
635            Err(Error::http_status(self.status, message))
636        } else {
637            Ok(self)
638        }
639    }
640}
641
642fn decode_gzip(data: &[u8]) -> Result<Bytes> {
643    let mut decoder = flate2::read::GzDecoder::new(data);
644    let mut decoded = Vec::new();
645    decoder
646        .read_to_end(&mut decoded)
647        .map_err(|e| Error::Decompression(format!("gzip: {}", e)))?;
648    Ok(Bytes::from(decoded))
649}
650
651fn decode_deflate(data: &[u8]) -> Result<Bytes> {
652    let mut decoded = Vec::new();
653    if flate2::read::ZlibDecoder::new(data)
654        .read_to_end(&mut decoded)
655        .is_ok()
656    {
657        return Ok(Bytes::from(decoded));
658    }
659    decoded.clear();
660    flate2::read::DeflateDecoder::new(data)
661        .read_to_end(&mut decoded)
662        .map_err(|e| Error::Decompression(format!("deflate: {}", e)))?;
663    Ok(Bytes::from(decoded))
664}
665
666fn decode_brotli(data: &[u8]) -> Result<Bytes> {
667    let mut decoder = brotli::Decompressor::new(data, 4096);
668    let mut decoded = Vec::new();
669    decoder
670        .read_to_end(&mut decoded)
671        .map_err(|e| Error::Decompression(format!("brotli: {}", e)))?;
672    Ok(Bytes::from(decoded))
673}
674
675fn decode_zstd(data: &[u8]) -> Result<Bytes> {
676    zstd::stream::decode_all(data)
677        .map(Bytes::from)
678        .map_err(|e| Error::Decompression(format!("zstd: {}", e)))
679}