Skip to main content

modkit_http/
response.rs

1use crate::error::HttpError;
2use crate::security::ERROR_BODY_PREVIEW_LIMIT;
3use bytes::Bytes;
4use http::{HeaderMap, Response, StatusCode};
5use http_body::Frame;
6use http_body_util::BodyExt;
7use pin_project_lite::pin_project;
8use serde::de::DeserializeOwned;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::{Duration, SystemTime};
12
13/// Parse `Retry-After` header value into a `Duration`.
14///
15/// Supports two formats per RFC 7231:
16/// - Seconds: "120" → 120 seconds
17/// - HTTP-date (RFC 1123): "Wed, 21 Oct 2015 07:28:00 GMT" → duration until that time
18///
19/// Returns `None` if:
20/// - Header is missing
21/// - Value cannot be parsed as integer or HTTP-date
22/// - Parsed duration is negative (time already passed or negative seconds)
23pub fn parse_retry_after(headers: &HeaderMap) -> Option<Duration> {
24    let value = headers.get(http::header::RETRY_AFTER)?.to_str().ok()?;
25    let trimmed = value.trim();
26
27    // First, try to parse as seconds (most common format)
28    if let Ok(seconds) = trimmed.parse::<i64>() {
29        if seconds < 0 {
30            return None;
31        }
32        return Some(Duration::from_secs(seconds.cast_unsigned()));
33    }
34
35    // Fall back to HTTP-date format (RFC 1123)
36    parse_http_date(trimmed)
37}
38
39/// Parse HTTP-date (RFC 1123) and return duration until that time.
40/// Returns `None` if the date is in the past or cannot be parsed.
41fn parse_http_date(value: &str) -> Option<Duration> {
42    let parsed = httpdate::parse_http_date(value).ok()?;
43    let now = SystemTime::now();
44
45    // Return duration until the parsed time (None if already passed)
46    parsed.duration_since(now).ok()
47}
48
49/// Type alias for the boxed response body that supports decompression.
50///
51/// This type can hold either a raw body or a decompressed body (gzip/br/deflate).
52/// The body is type-erased to allow the decompression layer to work transparently.
53pub type ResponseBody =
54    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
55
56pin_project! {
57    /// Body wrapper that enforces size limits during streaming.
58    ///
59    /// Created by [`HttpResponse::into_limited_body()`]. Tracks bytes read
60    /// and returns [`HttpError::BodyTooLarge`] if the limit is exceeded.
61    ///
62    /// Unlike reading the body with [`HttpResponse::bytes()`] or [`HttpResponse::json()`],
63    /// this allows incremental processing while still enforcing limits.
64    ///
65    /// # Example
66    ///
67    /// ```ignore
68    /// use http_body_util::BodyExt;
69    ///
70    /// let response = client.get("https://example.com/large-file").send().await?;
71    /// let mut body = response.into_limited_body();
72    ///
73    /// while let Some(frame) = body.frame().await {
74    ///     let frame = frame?; // Returns BodyTooLarge if limit exceeded
75    ///     if let Some(chunk) = frame.data_ref() {
76    ///         process_chunk(chunk);
77    ///     }
78    /// }
79    /// ```
80    pub struct LimitedBody {
81        #[pin]
82        inner: ResponseBody,
83        limit: usize,
84        read: usize,
85    }
86}
87
88impl LimitedBody {
89    /// Creates a new `LimitedBody` wrapping the given body with the specified limit.
90    #[must_use]
91    pub fn new(inner: ResponseBody, limit: usize) -> Self {
92        Self {
93            inner,
94            limit,
95            read: 0,
96        }
97    }
98
99    /// Returns the number of bytes read so far.
100    #[must_use]
101    pub fn bytes_read(&self) -> usize {
102        self.read
103    }
104
105    /// Returns the configured size limit.
106    #[must_use]
107    pub fn limit(&self) -> usize {
108        self.limit
109    }
110}
111
112impl http_body::Body for LimitedBody {
113    type Data = Bytes;
114    type Error = HttpError;
115
116    fn poll_frame(
117        self: Pin<&mut Self>,
118        cx: &mut Context<'_>,
119    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
120        let this = self.project();
121
122        match this.inner.poll_frame(cx) {
123            Poll::Ready(Some(Ok(frame))) => {
124                if let Some(data) = frame.data_ref() {
125                    *this.read += data.len();
126                    if *this.read > *this.limit {
127                        return Poll::Ready(Some(Err(HttpError::BodyTooLarge {
128                            limit: *this.limit,
129                            actual: *this.read,
130                        })));
131                    }
132                }
133                Poll::Ready(Some(Ok(frame)))
134            }
135            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(HttpError::Transport(e)))),
136            Poll::Ready(None) => Poll::Ready(None),
137            Poll::Pending => Poll::Pending,
138        }
139    }
140}
141
142/// HTTP response wrapper with body-reading helpers
143///
144/// Provides a reqwest-like API for reading response bodies:
145/// - `resp.error_for_status()?` - Check status without reading body
146/// - `resp.bytes().await?` - Read raw bytes
147/// - `resp.checked_bytes().await?` - Read bytes with status check
148/// - `resp.json::<T>().await?` - Parse as JSON with status check
149///
150/// All body reads enforce the configured `max_body_size` limit.
151#[derive(Debug)]
152pub struct HttpResponse {
153    pub(crate) inner: Response<ResponseBody>,
154    pub(crate) max_body_size: usize,
155}
156
157impl HttpResponse {
158    /// Get the response status code
159    #[must_use]
160    pub fn status(&self) -> StatusCode {
161        self.inner.status()
162    }
163
164    /// Get the response headers
165    #[must_use]
166    pub fn headers(&self) -> &HeaderMap {
167        self.inner.headers()
168    }
169
170    /// Consume the wrapper and return the inner response with boxed body
171    ///
172    /// Useful for advanced callers who need direct access to the response.
173    /// Note: The body has already been through the decompression layer,
174    /// so it contains decompressed bytes if the server sent compressed data.
175    #[must_use]
176    pub fn into_inner(self) -> Response<ResponseBody> {
177        self.inner
178    }
179
180    /// Check status and return error for non-2xx responses
181    ///
182    /// Does NOT read the response body. For non-2xx status, returns
183    /// `HttpError::HttpStatus` with an empty body preview.
184    ///
185    /// # Errors
186    ///
187    /// Returns `HttpError::HttpStatus` if the response status is not 2xx.
188    ///
189    /// # Example
190    ///
191    /// ```ignore
192    /// let resp = client.get("https://example.com/api").send().await?;
193    /// let resp = resp.error_for_status()?;  // Fails if not 2xx
194    /// let body = resp.bytes().await?;
195    /// ```
196    pub fn error_for_status(self) -> Result<Self, HttpError> {
197        if self.inner.status().is_success() {
198            return Ok(self);
199        }
200
201        let content_type = self
202            .inner
203            .headers()
204            .get(http::header::CONTENT_TYPE)
205            .and_then(|v| v.to_str().ok())
206            .map(String::from);
207
208        let retry_after = parse_retry_after(self.inner.headers());
209
210        Err(HttpError::HttpStatus {
211            status: self.inner.status(),
212            body_preview: String::new(),
213            content_type,
214            retry_after,
215        })
216    }
217
218    /// Read response body as bytes without status check
219    ///
220    /// Enforces `max_body_size` limit.
221    ///
222    /// # Errors
223    /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
224    pub async fn bytes(self) -> Result<Bytes, HttpError> {
225        read_body_limited_impl(self.inner, self.max_body_size).await
226    }
227
228    /// Read response body as bytes with status check
229    ///
230    /// Returns `HttpError::HttpStatus` for non-2xx responses (with body preview).
231    /// Enforces `max_body_size` limit for successful responses.
232    ///
233    /// # Errors
234    /// Returns `HttpError::HttpStatus` if status is not 2xx.
235    /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
236    pub async fn checked_bytes(self) -> Result<Bytes, HttpError> {
237        checked_body_impl(self.inner, self.max_body_size).await
238    }
239
240    /// Parse response body as JSON with status check
241    ///
242    /// Equivalent to `resp.checked_bytes().await?` followed by JSON parsing.
243    ///
244    /// # Errors
245    /// Returns `HttpError::HttpStatus` if status is not 2xx.
246    /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
247    /// Returns `HttpError::Json` if parsing fails.
248    pub async fn json<T: DeserializeOwned>(self) -> Result<T, HttpError> {
249        let body_bytes = checked_body_impl(self.inner, self.max_body_size).await?;
250        let value = serde_json::from_slice(&body_bytes)?;
251        Ok(value)
252    }
253
254    /// Read response body as text (UTF-8) with status check
255    ///
256    /// Equivalent to `resp.checked_bytes().await?` followed by UTF-8 conversion.
257    /// Invalid UTF-8 sequences are replaced with the Unicode replacement character.
258    ///
259    /// # Errors
260    /// Returns `HttpError::HttpStatus` if status is not 2xx.
261    /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
262    ///
263    /// # Example
264    ///
265    /// ```ignore
266    /// let body = client
267    ///     .get("https://example.com/text")
268    ///     .send()
269    ///     .await?
270    ///     .text()
271    ///     .await?;
272    /// println!("Response: {}", body);
273    /// ```
274    pub async fn text(self) -> Result<String, HttpError> {
275        let body_bytes = checked_body_impl(self.inner, self.max_body_size).await?;
276        Ok(String::from_utf8_lossy(&body_bytes).into_owned())
277    }
278
279    /// Returns the response body as a stream for incremental processing.
280    ///
281    /// # Warning: No Size Limit Enforcement
282    ///
283    /// This method does **NOT** enforce `max_body_size`. For untrusted responses
284    /// (especially compressed), prefer [`into_limited_body()`](Self::into_limited_body)
285    /// to protect against decompression bombs and memory exhaustion.
286    ///
287    /// Unlike `bytes()`, `json()`, or `text()`, this method does NOT:
288    /// - Check the HTTP status code (use `error_for_status()` first if needed)
289    /// - Enforce the `max_body_size` limit (caller is responsible for limiting)
290    /// - Buffer the entire body in memory
291    ///
292    /// Use this only when:
293    /// - You trust the response source AND have external size limits
294    /// - You're implementing custom streaming logic with your own limits
295    /// - Performance is critical and you can guarantee bounded responses
296    ///
297    /// # Example
298    ///
299    /// ```ignore
300    /// use http_body_util::BodyExt;
301    ///
302    /// let response = client.get("https://example.com/large-file").send().await?;
303    ///
304    /// // Check status first (optional)
305    /// if !response.status().is_success() {
306    ///     return Err(/* handle error */);
307    /// }
308    ///
309    /// // Get the body stream (WARNING: no size limit!)
310    /// let mut body = response.into_body();
311    ///
312    /// // Process frames incrementally
313    /// while let Some(frame) = body.frame().await {
314    ///     let frame = frame?;
315    ///     if let Some(chunk) = frame.data_ref() {
316    ///         process_chunk(chunk);
317    ///     }
318    /// }
319    /// ```
320    #[must_use]
321    pub fn into_body(self) -> ResponseBody {
322        self.inner.into_body()
323    }
324
325    /// Returns the response body as a size-limited stream.
326    ///
327    /// Unlike [`into_body()`](Self::into_body), this method wraps the body in a
328    /// [`LimitedBody`] that enforces the configured `max_body_size` limit during
329    /// streaming. This protects against decompression bombs where a small compressed
330    /// payload expands to gigabytes of memory.
331    ///
332    /// The limit is enforced on **decompressed** bytes, so a 1KB gzip payload that
333    /// decompresses to 1GB will be rejected.
334    ///
335    /// # Errors
336    ///
337    /// When the limit is exceeded, the next `poll_frame()` call returns
338    /// `HttpError::BodyTooLarge`.
339    ///
340    /// # Example
341    ///
342    /// ```ignore
343    /// use http_body_util::BodyExt;
344    ///
345    /// let response = client.get("https://example.com/large-file").send().await?;
346    /// let mut body = response.into_limited_body();
347    ///
348    /// while let Some(frame) = body.frame().await {
349    ///     let frame = frame?; // Returns BodyTooLarge if limit exceeded
350    ///     if let Some(chunk) = frame.data_ref() {
351    ///         process_chunk(chunk);
352    ///     }
353    /// }
354    ///
355    /// println!("Total bytes read: {}", body.bytes_read());
356    /// ```
357    #[must_use]
358    pub fn into_limited_body(self) -> LimitedBody {
359        LimitedBody::new(self.inner.into_body(), self.max_body_size)
360    }
361
362    /// Returns the configured max body size for this response.
363    ///
364    /// This is the limit that would be applied by `bytes()`, `checked_bytes()`,
365    /// `json()`, and `text()` methods.
366    #[must_use]
367    pub fn max_body_size(&self) -> usize {
368        self.max_body_size
369    }
370}
371
372/// Internal implementation of `checked_body` that doesn't capture `&self`
373pub async fn checked_body_impl(
374    response: Response<ResponseBody>,
375    max_body_size: usize,
376) -> Result<Bytes, HttpError> {
377    let status = response.status();
378    let content_type = response
379        .headers()
380        .get(http::header::CONTENT_TYPE)
381        .and_then(|v| v.to_str().ok())
382        .map(String::from);
383
384    if !status.is_success() {
385        // Parse Retry-After header before consuming response
386        let retry_after = parse_retry_after(response.headers());
387
388        // Read limited preview for error message
389        // Handle BodyTooLarge gracefully - don't let it hide the HTTP status error
390        let preview_limit = max_body_size.min(ERROR_BODY_PREVIEW_LIMIT);
391        let body_preview = match read_body_limited_impl(response, preview_limit).await {
392            Ok(bytes) => String::from_utf8_lossy(&bytes).into_owned(),
393            Err(HttpError::BodyTooLarge { .. }) => "<body too large for preview>".to_owned(),
394            Err(e) => return Err(e), // Propagate transport errors
395        };
396
397        return Err(HttpError::HttpStatus {
398            status,
399            body_preview,
400            content_type,
401            retry_after,
402        });
403    }
404
405    read_body_limited_impl(response, max_body_size).await
406}
407
408/// Internal implementation of `read_body_limited` that doesn't capture `&self`
409///
410/// This function reads from the (potentially decompressed) response body,
411/// enforcing the byte limit on decompressed data. This protects against
412/// decompression bombs where a small compressed payload expands to gigabytes.
413pub async fn read_body_limited_impl(
414    response: Response<ResponseBody>,
415    limit: usize,
416) -> Result<Bytes, HttpError> {
417    let (_parts, body) = response.into_parts();
418
419    let mut collected = Vec::new();
420    let mut body = std::pin::pin!(body);
421
422    while let Some(frame) = body.frame().await {
423        let frame = frame.map_err(HttpError::Transport)?;
424        if let Some(chunk) = frame.data_ref() {
425            if collected.len() + chunk.len() > limit {
426                return Err(HttpError::BodyTooLarge {
427                    limit,
428                    actual: collected.len() + chunk.len(),
429                });
430            }
431            collected.extend_from_slice(chunk);
432        }
433    }
434
435    Ok(Bytes::from(collected))
436}
437
438#[cfg(test)]
439#[cfg_attr(coverage_nightly, coverage(off))]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_parse_retry_after_seconds() {
445        let mut headers = HeaderMap::new();
446        headers.insert(http::header::RETRY_AFTER, "120".parse().unwrap());
447
448        let result = parse_retry_after(&headers);
449        assert_eq!(result, Some(Duration::from_secs(120)));
450    }
451
452    #[test]
453    fn test_parse_retry_after_seconds_with_whitespace() {
454        let mut headers = HeaderMap::new();
455        headers.insert(http::header::RETRY_AFTER, "  60  ".parse().unwrap());
456
457        let result = parse_retry_after(&headers);
458        assert_eq!(result, Some(Duration::from_secs(60)));
459    }
460
461    #[test]
462    fn test_parse_retry_after_missing() {
463        let headers = HeaderMap::new();
464        let result = parse_retry_after(&headers);
465        assert_eq!(result, None);
466    }
467
468    #[test]
469    fn test_parse_retry_after_invalid() {
470        let mut headers = HeaderMap::new();
471        headers.insert(http::header::RETRY_AFTER, "not-a-number".parse().unwrap());
472
473        let result = parse_retry_after(&headers);
474        assert_eq!(result, None);
475    }
476
477    #[test]
478    fn test_parse_retry_after_http_date_in_past() {
479        let mut headers = HeaderMap::new();
480        // HTTP-date in the past returns None
481        headers.insert(
482            http::header::RETRY_AFTER,
483            "Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap(),
484        );
485
486        let result = parse_retry_after(&headers);
487        assert_eq!(result, None);
488    }
489
490    #[test]
491    fn test_parse_retry_after_http_date_in_future() {
492        let mut headers = HeaderMap::new();
493        // Create a date 60 seconds in the future
494        let future_time = SystemTime::now() + Duration::from_secs(60);
495        let http_date = httpdate::fmt_http_date(future_time);
496        headers.insert(http::header::RETRY_AFTER, http_date.parse().unwrap());
497
498        let result = parse_retry_after(&headers);
499        assert!(result.is_some());
500        // Should be approximately 60 seconds (with some tolerance for test execution)
501        let duration = result.unwrap();
502        assert!(duration.as_secs() >= 58 && duration.as_secs() <= 62);
503    }
504
505    #[test]
506    fn test_parse_retry_after_negative_seconds() {
507        let mut headers = HeaderMap::new();
508        headers.insert(http::header::RETRY_AFTER, "-5".parse().unwrap());
509
510        let result = parse_retry_after(&headers);
511        assert_eq!(result, None);
512    }
513
514    #[test]
515    fn test_parse_retry_after_zero() {
516        let mut headers = HeaderMap::new();
517        headers.insert(http::header::RETRY_AFTER, "0".parse().unwrap());
518
519        let result = parse_retry_after(&headers);
520        assert_eq!(result, Some(Duration::from_secs(0)));
521    }
522}