modkit_http/response.rs
1use crate::error::HttpError;
2use crate::security::ERROR_BODY_PREVIEW_LIMIT;
3use bytes::Bytes;
4use http::{HeaderMap, Response, StatusCode};
5use http_body::Frame;
6use http_body_util::BodyExt;
7use pin_project_lite::pin_project;
8use serde::de::DeserializeOwned;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::{Duration, SystemTime};
12
13/// Parse `Retry-After` header value into a `Duration`.
14///
15/// Supports two formats per RFC 7231:
16/// - Seconds: "120" → 120 seconds
17/// - HTTP-date (RFC 1123): "Wed, 21 Oct 2015 07:28:00 GMT" → duration until that time
18///
19/// Returns `None` if:
20/// - Header is missing
21/// - Value cannot be parsed as integer or HTTP-date
22/// - Parsed duration is negative (time already passed or negative seconds)
23pub fn parse_retry_after(headers: &HeaderMap) -> Option<Duration> {
24 let value = headers.get(http::header::RETRY_AFTER)?.to_str().ok()?;
25 let trimmed = value.trim();
26
27 // First, try to parse as seconds (most common format)
28 if let Ok(seconds) = trimmed.parse::<i64>() {
29 if seconds < 0 {
30 return None;
31 }
32 return Some(Duration::from_secs(seconds.cast_unsigned()));
33 }
34
35 // Fall back to HTTP-date format (RFC 1123)
36 parse_http_date(trimmed)
37}
38
39/// Parse HTTP-date (RFC 1123) and return duration until that time.
40/// Returns `None` if the date is in the past or cannot be parsed.
41fn parse_http_date(value: &str) -> Option<Duration> {
42 let parsed = httpdate::parse_http_date(value).ok()?;
43 let now = SystemTime::now();
44
45 // Return duration until the parsed time (None if already passed)
46 parsed.duration_since(now).ok()
47}
48
49/// Type alias for the boxed response body that supports decompression.
50///
51/// This type can hold either a raw body or a decompressed body (gzip/br/deflate).
52/// The body is type-erased to allow the decompression layer to work transparently.
53pub type ResponseBody =
54 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
55
56pin_project! {
57 /// Body wrapper that enforces size limits during streaming.
58 ///
59 /// Created by [`HttpResponse::into_limited_body()`]. Tracks bytes read
60 /// and returns [`HttpError::BodyTooLarge`] if the limit is exceeded.
61 ///
62 /// Unlike reading the body with [`HttpResponse::bytes()`] or [`HttpResponse::json()`],
63 /// this allows incremental processing while still enforcing limits.
64 ///
65 /// # Example
66 ///
67 /// ```ignore
68 /// use http_body_util::BodyExt;
69 ///
70 /// let response = client.get("https://example.com/large-file").send().await?;
71 /// let mut body = response.into_limited_body();
72 ///
73 /// while let Some(frame) = body.frame().await {
74 /// let frame = frame?; // Returns BodyTooLarge if limit exceeded
75 /// if let Some(chunk) = frame.data_ref() {
76 /// process_chunk(chunk);
77 /// }
78 /// }
79 /// ```
80 pub struct LimitedBody {
81 #[pin]
82 inner: ResponseBody,
83 limit: usize,
84 read: usize,
85 }
86}
87
88impl LimitedBody {
89 /// Creates a new `LimitedBody` wrapping the given body with the specified limit.
90 #[must_use]
91 pub fn new(inner: ResponseBody, limit: usize) -> Self {
92 Self {
93 inner,
94 limit,
95 read: 0,
96 }
97 }
98
99 /// Returns the number of bytes read so far.
100 #[must_use]
101 pub fn bytes_read(&self) -> usize {
102 self.read
103 }
104
105 /// Returns the configured size limit.
106 #[must_use]
107 pub fn limit(&self) -> usize {
108 self.limit
109 }
110}
111
112impl http_body::Body for LimitedBody {
113 type Data = Bytes;
114 type Error = HttpError;
115
116 fn poll_frame(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
120 let this = self.project();
121
122 match this.inner.poll_frame(cx) {
123 Poll::Ready(Some(Ok(frame))) => {
124 if let Some(data) = frame.data_ref() {
125 *this.read += data.len();
126 if *this.read > *this.limit {
127 return Poll::Ready(Some(Err(HttpError::BodyTooLarge {
128 limit: *this.limit,
129 actual: *this.read,
130 })));
131 }
132 }
133 Poll::Ready(Some(Ok(frame)))
134 }
135 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(HttpError::Transport(e)))),
136 Poll::Ready(None) => Poll::Ready(None),
137 Poll::Pending => Poll::Pending,
138 }
139 }
140}
141
142/// HTTP response wrapper with body-reading helpers
143///
144/// Provides a reqwest-like API for reading response bodies:
145/// - `resp.error_for_status()?` - Check status without reading body
146/// - `resp.bytes().await?` - Read raw bytes
147/// - `resp.checked_bytes().await?` - Read bytes with status check
148/// - `resp.json::<T>().await?` - Parse as JSON with status check
149///
150/// All body reads enforce the configured `max_body_size` limit.
151#[derive(Debug)]
152pub struct HttpResponse {
153 pub(crate) inner: Response<ResponseBody>,
154 pub(crate) max_body_size: usize,
155}
156
157impl HttpResponse {
158 /// Get the response status code
159 #[must_use]
160 pub fn status(&self) -> StatusCode {
161 self.inner.status()
162 }
163
164 /// Get the response headers
165 #[must_use]
166 pub fn headers(&self) -> &HeaderMap {
167 self.inner.headers()
168 }
169
170 /// Consume the wrapper and return the inner response with boxed body
171 ///
172 /// Useful for advanced callers who need direct access to the response.
173 /// Note: The body has already been through the decompression layer,
174 /// so it contains decompressed bytes if the server sent compressed data.
175 #[must_use]
176 pub fn into_inner(self) -> Response<ResponseBody> {
177 self.inner
178 }
179
180 /// Check status and return error for non-2xx responses
181 ///
182 /// Does NOT read the response body. For non-2xx status, returns
183 /// `HttpError::HttpStatus` with an empty body preview.
184 ///
185 /// # Errors
186 ///
187 /// Returns `HttpError::HttpStatus` if the response status is not 2xx.
188 ///
189 /// # Example
190 ///
191 /// ```ignore
192 /// let resp = client.get("https://example.com/api").send().await?;
193 /// let resp = resp.error_for_status()?; // Fails if not 2xx
194 /// let body = resp.bytes().await?;
195 /// ```
196 pub fn error_for_status(self) -> Result<Self, HttpError> {
197 if self.inner.status().is_success() {
198 return Ok(self);
199 }
200
201 let content_type = self
202 .inner
203 .headers()
204 .get(http::header::CONTENT_TYPE)
205 .and_then(|v| v.to_str().ok())
206 .map(String::from);
207
208 let retry_after = parse_retry_after(self.inner.headers());
209
210 Err(HttpError::HttpStatus {
211 status: self.inner.status(),
212 body_preview: String::new(),
213 content_type,
214 retry_after,
215 })
216 }
217
218 /// Read response body as bytes without status check
219 ///
220 /// Enforces `max_body_size` limit.
221 ///
222 /// # Errors
223 /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
224 pub async fn bytes(self) -> Result<Bytes, HttpError> {
225 read_body_limited_impl(self.inner, self.max_body_size).await
226 }
227
228 /// Read response body as bytes with status check
229 ///
230 /// Returns `HttpError::HttpStatus` for non-2xx responses (with body preview).
231 /// Enforces `max_body_size` limit for successful responses.
232 ///
233 /// # Errors
234 /// Returns `HttpError::HttpStatus` if status is not 2xx.
235 /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
236 pub async fn checked_bytes(self) -> Result<Bytes, HttpError> {
237 checked_body_impl(self.inner, self.max_body_size).await
238 }
239
240 /// Parse response body as JSON with status check
241 ///
242 /// Equivalent to `resp.checked_bytes().await?` followed by JSON parsing.
243 ///
244 /// # Errors
245 /// Returns `HttpError::HttpStatus` if status is not 2xx.
246 /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
247 /// Returns `HttpError::Json` if parsing fails.
248 pub async fn json<T: DeserializeOwned>(self) -> Result<T, HttpError> {
249 let body_bytes = checked_body_impl(self.inner, self.max_body_size).await?;
250 let value = serde_json::from_slice(&body_bytes)?;
251 Ok(value)
252 }
253
254 /// Read response body as text (UTF-8) with status check
255 ///
256 /// Equivalent to `resp.checked_bytes().await?` followed by UTF-8 conversion.
257 /// Invalid UTF-8 sequences are replaced with the Unicode replacement character.
258 ///
259 /// # Errors
260 /// Returns `HttpError::HttpStatus` if status is not 2xx.
261 /// Returns `HttpError::BodyTooLarge` if body exceeds limit.
262 ///
263 /// # Example
264 ///
265 /// ```ignore
266 /// let body = client
267 /// .get("https://example.com/text")
268 /// .send()
269 /// .await?
270 /// .text()
271 /// .await?;
272 /// println!("Response: {}", body);
273 /// ```
274 pub async fn text(self) -> Result<String, HttpError> {
275 let body_bytes = checked_body_impl(self.inner, self.max_body_size).await?;
276 Ok(String::from_utf8_lossy(&body_bytes).into_owned())
277 }
278
279 /// Returns the response body as a stream for incremental processing.
280 ///
281 /// # Warning: No Size Limit Enforcement
282 ///
283 /// This method does **NOT** enforce `max_body_size`. For untrusted responses
284 /// (especially compressed), prefer [`into_limited_body()`](Self::into_limited_body)
285 /// to protect against decompression bombs and memory exhaustion.
286 ///
287 /// Unlike `bytes()`, `json()`, or `text()`, this method does NOT:
288 /// - Check the HTTP status code (use `error_for_status()` first if needed)
289 /// - Enforce the `max_body_size` limit (caller is responsible for limiting)
290 /// - Buffer the entire body in memory
291 ///
292 /// Use this only when:
293 /// - You trust the response source AND have external size limits
294 /// - You're implementing custom streaming logic with your own limits
295 /// - Performance is critical and you can guarantee bounded responses
296 ///
297 /// # Example
298 ///
299 /// ```ignore
300 /// use http_body_util::BodyExt;
301 ///
302 /// let response = client.get("https://example.com/large-file").send().await?;
303 ///
304 /// // Check status first (optional)
305 /// if !response.status().is_success() {
306 /// return Err(/* handle error */);
307 /// }
308 ///
309 /// // Get the body stream (WARNING: no size limit!)
310 /// let mut body = response.into_body();
311 ///
312 /// // Process frames incrementally
313 /// while let Some(frame) = body.frame().await {
314 /// let frame = frame?;
315 /// if let Some(chunk) = frame.data_ref() {
316 /// process_chunk(chunk);
317 /// }
318 /// }
319 /// ```
320 #[must_use]
321 pub fn into_body(self) -> ResponseBody {
322 self.inner.into_body()
323 }
324
325 /// Returns the response body as a size-limited stream.
326 ///
327 /// Unlike [`into_body()`](Self::into_body), this method wraps the body in a
328 /// [`LimitedBody`] that enforces the configured `max_body_size` limit during
329 /// streaming. This protects against decompression bombs where a small compressed
330 /// payload expands to gigabytes of memory.
331 ///
332 /// The limit is enforced on **decompressed** bytes, so a 1KB gzip payload that
333 /// decompresses to 1GB will be rejected.
334 ///
335 /// # Errors
336 ///
337 /// When the limit is exceeded, the next `poll_frame()` call returns
338 /// `HttpError::BodyTooLarge`.
339 ///
340 /// # Example
341 ///
342 /// ```ignore
343 /// use http_body_util::BodyExt;
344 ///
345 /// let response = client.get("https://example.com/large-file").send().await?;
346 /// let mut body = response.into_limited_body();
347 ///
348 /// while let Some(frame) = body.frame().await {
349 /// let frame = frame?; // Returns BodyTooLarge if limit exceeded
350 /// if let Some(chunk) = frame.data_ref() {
351 /// process_chunk(chunk);
352 /// }
353 /// }
354 ///
355 /// println!("Total bytes read: {}", body.bytes_read());
356 /// ```
357 #[must_use]
358 pub fn into_limited_body(self) -> LimitedBody {
359 LimitedBody::new(self.inner.into_body(), self.max_body_size)
360 }
361
362 /// Returns the configured max body size for this response.
363 ///
364 /// This is the limit that would be applied by `bytes()`, `checked_bytes()`,
365 /// `json()`, and `text()` methods.
366 #[must_use]
367 pub fn max_body_size(&self) -> usize {
368 self.max_body_size
369 }
370}
371
372/// Internal implementation of `checked_body` that doesn't capture `&self`
373pub async fn checked_body_impl(
374 response: Response<ResponseBody>,
375 max_body_size: usize,
376) -> Result<Bytes, HttpError> {
377 let status = response.status();
378 let content_type = response
379 .headers()
380 .get(http::header::CONTENT_TYPE)
381 .and_then(|v| v.to_str().ok())
382 .map(String::from);
383
384 if !status.is_success() {
385 // Parse Retry-After header before consuming response
386 let retry_after = parse_retry_after(response.headers());
387
388 // Read limited preview for error message
389 // Handle BodyTooLarge gracefully - don't let it hide the HTTP status error
390 let preview_limit = max_body_size.min(ERROR_BODY_PREVIEW_LIMIT);
391 let body_preview = match read_body_limited_impl(response, preview_limit).await {
392 Ok(bytes) => String::from_utf8_lossy(&bytes).into_owned(),
393 Err(HttpError::BodyTooLarge { .. }) => "<body too large for preview>".to_owned(),
394 Err(e) => return Err(e), // Propagate transport errors
395 };
396
397 return Err(HttpError::HttpStatus {
398 status,
399 body_preview,
400 content_type,
401 retry_after,
402 });
403 }
404
405 read_body_limited_impl(response, max_body_size).await
406}
407
408/// Internal implementation of `read_body_limited` that doesn't capture `&self`
409///
410/// This function reads from the (potentially decompressed) response body,
411/// enforcing the byte limit on decompressed data. This protects against
412/// decompression bombs where a small compressed payload expands to gigabytes.
413pub async fn read_body_limited_impl(
414 response: Response<ResponseBody>,
415 limit: usize,
416) -> Result<Bytes, HttpError> {
417 let (_parts, body) = response.into_parts();
418
419 let mut collected = Vec::new();
420 let mut body = std::pin::pin!(body);
421
422 while let Some(frame) = body.frame().await {
423 let frame = frame.map_err(HttpError::Transport)?;
424 if let Some(chunk) = frame.data_ref() {
425 if collected.len() + chunk.len() > limit {
426 return Err(HttpError::BodyTooLarge {
427 limit,
428 actual: collected.len() + chunk.len(),
429 });
430 }
431 collected.extend_from_slice(chunk);
432 }
433 }
434
435 Ok(Bytes::from(collected))
436}
437
438#[cfg(test)]
439#[cfg_attr(coverage_nightly, coverage(off))]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_parse_retry_after_seconds() {
445 let mut headers = HeaderMap::new();
446 headers.insert(http::header::RETRY_AFTER, "120".parse().unwrap());
447
448 let result = parse_retry_after(&headers);
449 assert_eq!(result, Some(Duration::from_secs(120)));
450 }
451
452 #[test]
453 fn test_parse_retry_after_seconds_with_whitespace() {
454 let mut headers = HeaderMap::new();
455 headers.insert(http::header::RETRY_AFTER, " 60 ".parse().unwrap());
456
457 let result = parse_retry_after(&headers);
458 assert_eq!(result, Some(Duration::from_secs(60)));
459 }
460
461 #[test]
462 fn test_parse_retry_after_missing() {
463 let headers = HeaderMap::new();
464 let result = parse_retry_after(&headers);
465 assert_eq!(result, None);
466 }
467
468 #[test]
469 fn test_parse_retry_after_invalid() {
470 let mut headers = HeaderMap::new();
471 headers.insert(http::header::RETRY_AFTER, "not-a-number".parse().unwrap());
472
473 let result = parse_retry_after(&headers);
474 assert_eq!(result, None);
475 }
476
477 #[test]
478 fn test_parse_retry_after_http_date_in_past() {
479 let mut headers = HeaderMap::new();
480 // HTTP-date in the past returns None
481 headers.insert(
482 http::header::RETRY_AFTER,
483 "Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap(),
484 );
485
486 let result = parse_retry_after(&headers);
487 assert_eq!(result, None);
488 }
489
490 #[test]
491 fn test_parse_retry_after_http_date_in_future() {
492 let mut headers = HeaderMap::new();
493 // Create a date 60 seconds in the future
494 let future_time = SystemTime::now() + Duration::from_secs(60);
495 let http_date = httpdate::fmt_http_date(future_time);
496 headers.insert(http::header::RETRY_AFTER, http_date.parse().unwrap());
497
498 let result = parse_retry_after(&headers);
499 assert!(result.is_some());
500 // Should be approximately 60 seconds (with some tolerance for test execution)
501 let duration = result.unwrap();
502 assert!(duration.as_secs() >= 58 && duration.as_secs() <= 62);
503 }
504
505 #[test]
506 fn test_parse_retry_after_negative_seconds() {
507 let mut headers = HeaderMap::new();
508 headers.insert(http::header::RETRY_AFTER, "-5".parse().unwrap());
509
510 let result = parse_retry_after(&headers);
511 assert_eq!(result, None);
512 }
513
514 #[test]
515 fn test_parse_retry_after_zero() {
516 let mut headers = HeaderMap::new();
517 headers.insert(http::header::RETRY_AFTER, "0".parse().unwrap());
518
519 let result = parse_retry_after(&headers);
520 assert_eq!(result, Some(Duration::from_secs(0)));
521 }
522}