Skip to main content

qubit_http/response/
http_response.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Unified HTTP response type and helpers.
10
11use std::time::Duration;
12
13use async_stream::stream;
14use bytes::Bytes;
15use futures_util::{stream as futures_stream, StreamExt};
16use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
17use http::{HeaderMap, Method, StatusCode};
18use serde::de::DeserializeOwned;
19use tokio_util::sync::CancellationToken;
20use url::Url;
21
22use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
23use crate::sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode};
24use crate::{HttpByteStream, HttpError, HttpErrorKind, HttpResult};
25
26use super::{HttpResponseMeta, HttpResponseOptions};
27
28/// Runtime state bound to one response instance.
29#[derive(Debug, Clone)]
30struct HttpResponseRuntime {
31    /// Per-response read timeout inherited from request/client.
32    read_timeout: Duration,
33    /// Optional cancellation token inherited from request.
34    cancellation_token: Option<CancellationToken>,
35    /// Request URL used in read/cancellation error context.
36    request_url: Url,
37}
38
39impl HttpResponseRuntime {
40    fn new(
41        read_timeout: Duration,
42        cancellation_token: Option<CancellationToken>,
43        request_url: Url,
44    ) -> Self {
45        Self {
46            read_timeout,
47            cancellation_token,
48            request_url,
49        }
50    }
51}
52
53/// Unified HTTP response with lazily consumed body.
54#[derive(Debug)]
55pub struct HttpResponse {
56    /// Response metadata (status, headers, final URL, request method).
57    pub meta: HttpResponseMeta,
58    /// Raw backend response until consumed.
59    backend: Option<reqwest::Response>,
60    /// Cached full body bytes after eager or lazy read.
61    buffered_body: Option<Bytes>,
62    /// Runtime state inherited from request/client.
63    runtime: HttpResponseRuntime,
64    /// Decode and error-preview options inherited from client options.
65    options: HttpResponseOptions,
66}
67
68impl HttpResponse {
69    /// Creates a buffered response.
70    pub fn new(
71        status: StatusCode,
72        headers: HeaderMap,
73        body: Bytes,
74        url: Url,
75        method: Method,
76    ) -> Self {
77        Self {
78            meta: HttpResponseMeta::new(status, headers, url.clone(), method),
79            backend: None,
80            buffered_body: Some(body),
81            runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
82            options: HttpResponseOptions::default(),
83        }
84    }
85
86    /// Creates a response from backend response and request-scoped options.
87    pub(crate) fn from_backend(
88        meta: HttpResponseMeta,
89        backend: reqwest::Response,
90        read_timeout: Duration,
91        cancellation_token: Option<CancellationToken>,
92        request_url: Url,
93        options: HttpResponseOptions,
94    ) -> Self {
95        Self {
96            meta,
97            backend: Some(backend),
98            buffered_body: None,
99            runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
100            options,
101        }
102    }
103
104    /// Returns shared response metadata.
105    #[inline]
106    pub fn meta(&self) -> &HttpResponseMeta {
107        &self.meta
108    }
109
110    /// Returns response status code.
111    #[inline]
112    pub fn status(&self) -> StatusCode {
113        self.meta.status
114    }
115
116    /// Returns response headers.
117    #[inline]
118    pub fn headers(&self) -> &HeaderMap {
119        &self.meta.headers
120    }
121
122    /// Returns final response URL.
123    #[inline]
124    pub fn url(&self) -> &Url {
125        &self.meta.url
126    }
127
128    /// Returns request URL used in response read context.
129    #[inline]
130    pub fn request_url(&self) -> &Url {
131        &self.runtime.request_url
132    }
133
134    /// Returns whether status is success.
135    #[inline]
136    pub fn is_success(&self) -> bool {
137        self.status().is_success()
138    }
139
140    /// Returns parsed `Retry-After` hint when status and headers provide one.
141    #[inline]
142    pub fn retry_after_hint(&self) -> Option<Duration> {
143        self.meta.retry_after_hint()
144    }
145
146    /// Returns `Ok(self)` for success statuses, otherwise maps a status error
147    /// with `Retry-After` and response-body preview context.
148    pub(crate) async fn into_success_or_status_error(
149        self,
150        message_prefix: &str,
151    ) -> HttpResult<Self> {
152        let status = self.status();
153        if status.is_success() {
154            return Ok(self);
155        }
156        let retry_after = self.retry_after_hint();
157        let method = self.meta.method.clone();
158        let url = self.request_url().clone();
159        let error_preview_limit = self.options.error_response_preview_limit;
160        let body_preview = self.into_error_body_preview(error_preview_limit).await?;
161        let message = format!(
162            "{} with status {} for {} {}; response body preview: {}",
163            message_prefix, status, method, url, body_preview
164        );
165        let mut mapped = HttpError::status(status, message)
166            .with_method(&method)
167            .with_url(&url)
168            .with_response_body_preview(body_preview);
169        if let Some(retry_after) = retry_after {
170            mapped = mapped.with_retry_after(retry_after);
171        }
172        Err(mapped)
173    }
174
175    /// Consumes this response and returns a bounded body preview for status errors.
176    ///
177    /// # Errors
178    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
179    /// when the request cancellation token fires while preview bytes are being
180    /// read.
181    pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
182        let limit = max_bytes.max(1);
183        if let Some(error) = self.cancelled_error_if_needed(
184            "Request cancelled while reading status error response body preview",
185        ) {
186            return Err(error);
187        }
188        if let Some(body) = self.buffered_body.take() {
189            let end = body.len().min(limit);
190            return Ok(Self::render_error_body_preview(
191                &body[..end],
192                body.len() > limit,
193            ));
194        }
195        let Some(backend) = self.backend.take() else {
196            return Ok("<empty>".to_string());
197        };
198        Self::read_error_body_preview(
199            backend,
200            self.runtime.read_timeout,
201            self.runtime.cancellation_token.clone(),
202            self.meta.method.clone(),
203            self.runtime.request_url.clone(),
204            limit,
205        )
206        .await
207    }
208
209    /// Returns full body bytes, consuming backend stream lazily on first call.
210    pub async fn bytes(&mut self) -> HttpResult<Bytes> {
211        if let Some(body) = &self.buffered_body {
212            return Ok(body.clone());
213        }
214        let Some(mut backend) = self.backend.take() else {
215            self.buffered_body = Some(Bytes::new());
216            return Ok(Bytes::new());
217        };
218
219        let method = self.meta.method.clone();
220        let url = self.runtime.request_url.clone();
221        let read_timeout = self.runtime.read_timeout;
222        let cancellation_token = self.runtime.cancellation_token.clone();
223        let mut body = bytes::BytesMut::new();
224
225        loop {
226            let next = if let Some(token) = &cancellation_token {
227                tokio::select! {
228                    _ = token.cancelled() => {
229                        return Err(HttpError::cancelled("Request cancelled while reading response body")
230                            .with_method(&method)
231                            .with_url(&url));
232                    }
233                    item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
234                }
235            } else {
236                tokio::time::timeout(read_timeout, backend.chunk()).await
237            };
238
239            match next {
240                Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
241                Ok(Ok(None)) => {
242                    let body = body.freeze();
243                    self.buffered_body = Some(body.clone());
244                    return Ok(body);
245                }
246                Ok(Err(error)) => {
247                    return Err(map_reqwest_error(
248                        error,
249                        HttpErrorKind::Decode,
250                        Some(ReqwestErrorPhase::Read),
251                        Some(method),
252                        Some(url),
253                    ));
254                }
255                Err(_) => {
256                    return Err(HttpError::read_timeout(format!(
257                        "Read timeout after {:?} while reading response body",
258                        read_timeout
259                    ))
260                    .with_method(&self.meta.method)
261                    .with_url(&self.runtime.request_url));
262                }
263            }
264        }
265    }
266
267    /// Returns body as stream; if already buffered, returns stream backed by cached bytes.
268    pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
269        if let Some(body) = self.buffered_body.as_ref() {
270            let bytes = body.clone();
271            return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
272        }
273        if let Some(error) = self
274            .cancelled_error_if_needed("Streaming response cancelled before reading response body")
275        {
276            return Err(error);
277        }
278        let Some(backend) = self.backend.take() else {
279            return Ok(Box::pin(futures_stream::empty()));
280        };
281
282        let method = self.meta.method.clone();
283        let url = self.runtime.request_url.clone();
284        let read_timeout = self.runtime.read_timeout;
285        let cancellation_token = self.runtime.cancellation_token.clone();
286        let mut stream = backend.bytes_stream();
287        let wrapped = stream! {
288            loop {
289                let next = if let Some(token) = &cancellation_token {
290                    tokio::select! {
291                        _ = token.cancelled() => {
292                            yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
293                                .with_method(&method)
294                                .with_url(&url));
295                            break;
296                        }
297                        item = tokio::time::timeout(read_timeout, stream.next()) => item,
298                    }
299                } else {
300                    tokio::time::timeout(read_timeout, stream.next()).await
301                };
302                match next {
303                    Ok(Some(Ok(bytes))) => yield Ok(bytes),
304                    Ok(Some(Err(error))) => {
305                        let mapped = map_reqwest_error(
306                            error,
307                            HttpErrorKind::Transport,
308                            Some(ReqwestErrorPhase::Read),
309                            Some(method.clone()),
310                            Some(url.clone()),
311                        );
312                        yield Err(mapped);
313                        break;
314                    }
315                    Ok(None) => break,
316                    Err(_) => {
317                        let error = HttpError::read_timeout(format!(
318                            "Read timeout after {:?} while streaming response",
319                            read_timeout
320                        ))
321                        .with_method(&method)
322                        .with_url(&url);
323                        yield Err(error);
324                        break;
325                    }
326                }
327            }
328        };
329        Ok(Box::pin(wrapped))
330    }
331
332    /// Interprets response body as UTF-8 text.
333    pub async fn text(&mut self) -> HttpResult<String> {
334        let body = self.bytes().await?;
335        String::from_utf8(body.to_vec()).map_err(|error| {
336            HttpError::decode(format!(
337                "Failed to decode response body as UTF-8: {}",
338                error
339            ))
340            .with_status(self.meta.status)
341            .with_url(&self.meta.url)
342        })
343    }
344
345    /// Deserializes response body as JSON.
346    pub async fn json<T>(&mut self) -> HttpResult<T>
347    where
348        T: DeserializeOwned,
349    {
350        let body = self.bytes().await?;
351        serde_json::from_slice(&body).map_err(|error| {
352            HttpError::decode(format!("Failed to decode response JSON: {}", error))
353                .with_status(self.meta.status)
354                .with_url(&self.meta.url)
355        })
356    }
357
358    /// Overrides the maximum allowed size (in bytes) for one SSE line on this response.
359    ///
360    /// Values below 1 are clamped to 1. Returns `self` so callers can chain configuration
361    /// before consuming the body with [`Self::sse_events`] or [`Self::sse_chunks`]
362    /// (together with [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`], etc.).
363    #[inline]
364    pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
365        self.options.sse_max_line_bytes = max_line_bytes.max(1);
366        self
367    }
368
369    /// Overrides the maximum allowed size (in bytes) for one SSE frame on this response.
370    ///
371    /// Values below 1 are clamped to 1. Returns `self` for chained configuration.
372    #[inline]
373    pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
374        self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
375        self
376    }
377
378    /// Overrides the JSON decoding mode used by [`Self::sse_chunks`] on this response.
379    #[inline]
380    pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
381        self.options.sse_json_mode = mode;
382        self
383    }
384
385    /// Overrides how [`Self::sse_chunks`] detects end-of-stream from trimmed `data:` payloads.
386    #[inline]
387    pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
388        self.options.sse_done_marker_policy = policy;
389        self
390    }
391
392    /// Decodes body stream as SSE events using this response's SSE line/frame byte limits (from
393    /// client defaults unless overridden via [`Self::sse_max_line_bytes`] /
394    /// [`Self::sse_max_frame_bytes`]).
395    pub fn sse_events(mut self) -> SseEventStream {
396        let max_line_bytes = self.options.sse_max_line_bytes;
397        let max_frame_bytes = self.options.sse_max_frame_bytes;
398        match self.stream() {
399            Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
400                stream,
401                max_line_bytes,
402                max_frame_bytes,
403            ),
404            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
405        }
406    }
407
408    /// Decodes SSE `data:` lines as JSON chunks using this response's SSE JSON mode, done-marker
409    /// policy, and line/frame limits (see [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`],
410    /// [`Self::sse_max_line_bytes`], [`Self::sse_max_frame_bytes`]).
411    pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
412    where
413        T: DeserializeOwned + Send + 'static,
414    {
415        let done_policy = self.options.sse_done_marker_policy.clone();
416        let mode = self.options.sse_json_mode;
417        let max_line_bytes = self.options.sse_max_line_bytes;
418        let max_frame_bytes = self.options.sse_max_frame_bytes;
419        match self.stream() {
420            Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
421                stream,
422                done_policy,
423                mode,
424                max_line_bytes,
425                max_frame_bytes,
426            ),
427            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
428        }
429    }
430
431    /// Returns a buffered body reference for response logging if available.
432    ///
433    /// # Returns
434    /// `Some(&Bytes)` when response body has already been buffered.
435    pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
436        self.buffered_body.as_ref()
437    }
438
439    /// Returns whether logger may safely buffer the full body for logging.
440    ///
441    /// # Parameters
442    /// - `body_log_limit`: Configured logging body preview limit in bytes.
443    ///
444    /// # Returns
445    /// `true` only when this response is not SSE, has an explicit `Content-Length`,
446    /// and declared length is within `body_log_limit`.
447    pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
448        if self.backend.is_none() {
449            return false;
450        }
451        if self.is_sse_response() {
452            return false;
453        }
454        self.content_length_hint()
455            .is_some_and(|content_length| content_length <= body_log_limit as u64)
456    }
457
458    /// Reads bounded preview bytes from a response body for status error messages.
459    ///
460    /// # Errors
461    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
462    /// when the supplied cancellation token fires while waiting for preview
463    /// bytes.
464    async fn read_error_body_preview(
465        mut response: reqwest::Response,
466        read_timeout: Duration,
467        cancellation_token: Option<CancellationToken>,
468        method: Method,
469        url: Url,
470        max_bytes: usize,
471    ) -> HttpResult<String> {
472        let limit = max_bytes.max(1);
473        let mut preview = Vec::new();
474        let mut truncated = false;
475
476        loop {
477            let next = if let Some(token) = cancellation_token.as_ref() {
478                tokio::select! {
479                    _ = token.cancelled() => {
480                        return Err(HttpError::cancelled(
481                            "Request cancelled while reading status error response body preview",
482                        )
483                        .with_method(&method)
484                        .with_url(&url));
485                    }
486                    item = tokio::time::timeout(read_timeout, response.chunk()) => item,
487                }
488            } else {
489                tokio::time::timeout(read_timeout, response.chunk()).await
490            };
491            match next {
492                Ok(Ok(Some(chunk))) => {
493                    if preview.len() >= limit {
494                        truncated = true;
495                        break;
496                    }
497                    let remaining = limit - preview.len();
498                    if chunk.len() > remaining {
499                        preview.extend_from_slice(&chunk[..remaining]);
500                        truncated = true;
501                        break;
502                    }
503                    preview.extend_from_slice(&chunk);
504                }
505                Ok(Ok(None)) => break,
506                Ok(Err(error)) => {
507                    return Ok(format!(
508                        "<error body unavailable: failed to read response body: {}>",
509                        error
510                    ));
511                }
512                Err(_) => {
513                    return Ok(format!(
514                        "<error body unavailable: read timeout after {:?}>",
515                        read_timeout
516                    ));
517                }
518            }
519        }
520        Ok(Self::render_error_body_preview(&preview, truncated))
521    }
522
523    /// Returns a cancellation error with response read context when cancelled.
524    ///
525    /// # Parameters
526    /// - `message`: Cancellation message to include in the error.
527    ///
528    /// # Returns
529    /// `Some(HttpError)` when this response has a cancelled token; otherwise
530    /// `None`.
531    fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
532        if self
533            .runtime
534            .cancellation_token
535            .as_ref()
536            .is_some_and(CancellationToken::is_cancelled)
537        {
538            Some(
539                HttpError::cancelled(message.to_string())
540                    .with_method(&self.meta.method)
541                    .with_url(&self.runtime.request_url),
542            )
543        } else {
544            None
545        }
546    }
547
548    /// Returns `Content-Length` parsed from response headers when present and valid.
549    fn content_length_hint(&self) -> Option<u64> {
550        self.meta
551            .headers
552            .get(CONTENT_LENGTH)
553            .and_then(|value| value.to_str().ok())
554            .and_then(|value| value.parse::<u64>().ok())
555    }
556
557    /// Returns whether response content-type is SSE (`text/event-stream`).
558    fn is_sse_response(&self) -> bool {
559        self.meta
560            .headers
561            .get(CONTENT_TYPE)
562            .and_then(|value| value.to_str().ok())
563            .is_some_and(|content_type| {
564                content_type
565                    .to_ascii_lowercase()
566                    .starts_with("text/event-stream")
567            })
568    }
569
570    fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
571        if bytes.is_empty() {
572            return "<empty>".to_string();
573        }
574        let suffix = if truncated { "...<truncated>" } else { "" };
575        match std::str::from_utf8(bytes) {
576            Ok(text) => format!("{text}{suffix}"),
577            Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
578        }
579    }
580}
581
582/// Exercises internal response preview branches for coverage-only tests.
583///
584/// # Returns
585/// Preview diagnostics for buffered, empty, and cancelled response states.
586#[cfg(coverage)]
587#[doc(hidden)]
588pub(crate) async fn coverage_exercise_response_preview_paths() -> Vec<String> {
589    let url = Url::parse("https://example.com/coverage").expect("coverage URL should parse");
590    let buffered = HttpResponse::new(
591        StatusCode::BAD_GATEWAY,
592        HeaderMap::new(),
593        Bytes::from_static(b"abcdef"),
594        url.clone(),
595        Method::GET,
596    )
597    .into_error_body_preview(3)
598    .await
599    .expect("buffered preview should render");
600
601    let empty = HttpResponse {
602        meta: HttpResponseMeta::new(
603            StatusCode::BAD_GATEWAY,
604            HeaderMap::new(),
605            url.clone(),
606            Method::GET,
607        ),
608        backend: None,
609        buffered_body: None,
610        runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url.clone()),
611        options: HttpResponseOptions::default(),
612    }
613    .into_error_body_preview(3)
614    .await
615    .expect("empty preview should render");
616
617    let token = CancellationToken::new();
618    token.cancel();
619    let cancelled = HttpResponse {
620        meta: HttpResponseMeta::new(
621            StatusCode::BAD_GATEWAY,
622            HeaderMap::new(),
623            url.clone(),
624            Method::GET,
625        ),
626        backend: None,
627        buffered_body: None,
628        runtime: HttpResponseRuntime::new(Duration::from_secs(30), Some(token), url),
629        options: HttpResponseOptions::default(),
630    }
631    .into_error_body_preview(3)
632    .await
633    .expect_err("cancelled preview should fail");
634
635    vec![buffered, empty, format!("{:?}", cancelled.kind)]
636}