Skip to main content

qubit_http/response/
http_response.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Unified HTTP response type and helpers.
11
12use std::time::Duration;
13
14use async_stream::stream;
15use bytes::Bytes;
16use futures_util::{stream as futures_stream, StreamExt};
17use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
18use http::{HeaderMap, Method, StatusCode};
19use serde::de::DeserializeOwned;
20use tokio_util::sync::CancellationToken;
21use url::Url;
22
23use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
24use crate::sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode};
25use crate::{HttpByteStream, HttpError, HttpErrorKind, HttpResult};
26
27use super::{HttpResponseMeta, HttpResponseOptions};
28
29/// Runtime state bound to one response instance.
30#[derive(Debug, Clone)]
31struct HttpResponseRuntime {
32    /// Per-response read timeout inherited from request/client.
33    read_timeout: Duration,
34    /// Optional cancellation token inherited from request.
35    cancellation_token: Option<CancellationToken>,
36    /// Request URL used in read/cancellation error context.
37    request_url: Url,
38}
39
40impl HttpResponseRuntime {
41    fn new(
42        read_timeout: Duration,
43        cancellation_token: Option<CancellationToken>,
44        request_url: Url,
45    ) -> Self {
46        Self {
47            read_timeout,
48            cancellation_token,
49            request_url,
50        }
51    }
52}
53
54/// Unified HTTP response with lazily consumed body.
55#[derive(Debug)]
56pub struct HttpResponse {
57    /// Response metadata (status, headers, final URL, request method).
58    pub meta: HttpResponseMeta,
59    /// Raw backend response until consumed.
60    backend: Option<reqwest::Response>,
61    /// Cached full body bytes after eager or lazy read.
62    buffered_body: Option<Bytes>,
63    /// Runtime state inherited from request/client.
64    runtime: HttpResponseRuntime,
65    /// Decode and error-preview options inherited from client options.
66    options: HttpResponseOptions,
67}
68
69impl HttpResponse {
70    /// Creates a buffered response.
71    pub fn new(
72        status: StatusCode,
73        headers: HeaderMap,
74        body: Bytes,
75        url: Url,
76        method: Method,
77    ) -> Self {
78        Self {
79            meta: HttpResponseMeta::new(status, headers, url.clone(), method),
80            backend: None,
81            buffered_body: Some(body),
82            runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
83            options: HttpResponseOptions::default(),
84        }
85    }
86
87    /// Creates a response from backend response and request-scoped options.
88    pub(crate) fn from_backend(
89        meta: HttpResponseMeta,
90        backend: reqwest::Response,
91        read_timeout: Duration,
92        cancellation_token: Option<CancellationToken>,
93        request_url: Url,
94        options: HttpResponseOptions,
95    ) -> Self {
96        Self {
97            meta,
98            backend: Some(backend),
99            buffered_body: None,
100            runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
101            options,
102        }
103    }
104
105    /// Returns shared response metadata.
106    #[inline]
107    pub fn meta(&self) -> &HttpResponseMeta {
108        &self.meta
109    }
110
111    /// Returns response status code.
112    #[inline]
113    pub fn status(&self) -> StatusCode {
114        self.meta.status
115    }
116
117    /// Returns response headers.
118    #[inline]
119    pub fn headers(&self) -> &HeaderMap {
120        &self.meta.headers
121    }
122
123    /// Returns final response URL.
124    #[inline]
125    pub fn url(&self) -> &Url {
126        &self.meta.url
127    }
128
129    /// Returns request URL used in response read context.
130    #[inline]
131    pub fn request_url(&self) -> &Url {
132        &self.runtime.request_url
133    }
134
135    /// Returns whether status is success.
136    #[inline]
137    pub fn is_success(&self) -> bool {
138        self.status().is_success()
139    }
140
141    /// Returns parsed `Retry-After` hint when status and headers provide one.
142    #[inline]
143    pub fn retry_after_hint(&self) -> Option<Duration> {
144        self.meta.retry_after_hint()
145    }
146
147    /// Returns `Ok(self)` for success statuses, otherwise maps a status error
148    /// with `Retry-After` and response-body preview context.
149    pub(crate) async fn into_success_or_status_error(
150        self,
151        message_prefix: &str,
152    ) -> HttpResult<Self> {
153        let status = self.status();
154        if status.is_success() {
155            return Ok(self);
156        }
157        let retry_after = self.retry_after_hint();
158        let method = self.meta.method.clone();
159        let url = self.request_url().clone();
160        let error_preview_limit = self.options.error_response_preview_limit;
161        let body_preview = self.into_error_body_preview(error_preview_limit).await?;
162        let message = format!(
163            "{} with status {} for {} {}; response body preview: {}",
164            message_prefix, status, method, url, body_preview
165        );
166        let mut mapped = HttpError::status(status, message)
167            .with_method(&method)
168            .with_url(&url)
169            .with_response_body_preview(body_preview);
170        if let Some(retry_after) = retry_after {
171            mapped = mapped.with_retry_after(retry_after);
172        }
173        Err(mapped)
174    }
175
176    /// Consumes this response and returns a bounded body preview for status errors.
177    ///
178    /// # Errors
179    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
180    /// when the request cancellation token fires while preview bytes are being
181    /// read.
182    pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
183        let limit = max_bytes.max(1);
184        if let Some(error) = self.cancelled_error_if_needed(
185            "Request cancelled while reading status error response body preview",
186        ) {
187            return Err(error);
188        }
189        if let Some(body) = self.buffered_body.take() {
190            let end = body.len().min(limit);
191            return Ok(Self::render_error_body_preview(
192                &body[..end],
193                body.len() > limit,
194            ));
195        }
196        let Some(backend) = self.backend.take() else {
197            return Ok("<empty>".to_string());
198        };
199        Self::read_error_body_preview(
200            backend,
201            self.runtime.read_timeout,
202            self.runtime.cancellation_token.clone(),
203            self.meta.method.clone(),
204            self.runtime.request_url.clone(),
205            limit,
206        )
207        .await
208    }
209
210    /// Returns full body bytes, consuming backend stream lazily on first call.
211    pub async fn bytes(&mut self) -> HttpResult<Bytes> {
212        if let Some(body) = &self.buffered_body {
213            return Ok(body.clone());
214        }
215        let Some(mut backend) = self.backend.take() else {
216            self.buffered_body = Some(Bytes::new());
217            return Ok(Bytes::new());
218        };
219
220        let method = self.meta.method.clone();
221        let url = self.runtime.request_url.clone();
222        let read_timeout = self.runtime.read_timeout;
223        let cancellation_token = self.runtime.cancellation_token.clone();
224        let mut body = bytes::BytesMut::new();
225
226        loop {
227            let next = if let Some(token) = &cancellation_token {
228                tokio::select! {
229                    _ = token.cancelled() => {
230                        return Err(HttpError::cancelled("Request cancelled while reading response body")
231                            .with_method(&method)
232                            .with_url(&url));
233                    }
234                    item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
235                }
236            } else {
237                tokio::time::timeout(read_timeout, backend.chunk()).await
238            };
239
240            match next {
241                Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
242                Ok(Ok(None)) => {
243                    let body = body.freeze();
244                    self.buffered_body = Some(body.clone());
245                    return Ok(body);
246                }
247                Ok(Err(error)) => {
248                    return Err(map_reqwest_error(
249                        error,
250                        HttpErrorKind::Decode,
251                        Some(ReqwestErrorPhase::Read),
252                        Some(method),
253                        Some(url),
254                    ));
255                }
256                Err(_) => {
257                    return Err(HttpError::read_timeout(format!(
258                        "Read timeout after {:?} while reading response body",
259                        read_timeout
260                    ))
261                    .with_method(&self.meta.method)
262                    .with_url(&self.runtime.request_url));
263                }
264            }
265        }
266    }
267
268    /// Returns body as stream; if already buffered, returns stream backed by cached bytes.
269    pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
270        if let Some(body) = self.buffered_body.as_ref() {
271            let bytes = body.clone();
272            return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
273        }
274        if let Some(error) = self
275            .cancelled_error_if_needed("Streaming response cancelled before reading response body")
276        {
277            return Err(error);
278        }
279        let Some(backend) = self.backend.take() else {
280            return Ok(Box::pin(futures_stream::empty()));
281        };
282
283        let method = self.meta.method.clone();
284        let url = self.runtime.request_url.clone();
285        let read_timeout = self.runtime.read_timeout;
286        let cancellation_token = self.runtime.cancellation_token.clone();
287        let mut stream = backend.bytes_stream();
288        let wrapped = stream! {
289            loop {
290                let next = if let Some(token) = &cancellation_token {
291                    tokio::select! {
292                        _ = token.cancelled() => {
293                            yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
294                                .with_method(&method)
295                                .with_url(&url));
296                            break;
297                        }
298                        item = tokio::time::timeout(read_timeout, stream.next()) => item,
299                    }
300                } else {
301                    tokio::time::timeout(read_timeout, stream.next()).await
302                };
303                match next {
304                    Ok(Some(Ok(bytes))) => yield Ok(bytes),
305                    Ok(Some(Err(error))) => {
306                        let mapped = map_reqwest_error(
307                            error,
308                            HttpErrorKind::Transport,
309                            Some(ReqwestErrorPhase::Read),
310                            Some(method.clone()),
311                            Some(url.clone()),
312                        );
313                        yield Err(mapped);
314                        break;
315                    }
316                    Ok(None) => break,
317                    Err(_) => {
318                        let error = HttpError::read_timeout(format!(
319                            "Read timeout after {:?} while streaming response",
320                            read_timeout
321                        ))
322                        .with_method(&method)
323                        .with_url(&url);
324                        yield Err(error);
325                        break;
326                    }
327                }
328            }
329        };
330        Ok(Box::pin(wrapped))
331    }
332
333    /// Interprets response body as UTF-8 text.
334    pub async fn text(&mut self) -> HttpResult<String> {
335        let body = self.bytes().await?;
336        String::from_utf8(body.to_vec()).map_err(|error| {
337            HttpError::decode(format!(
338                "Failed to decode response body as UTF-8: {}",
339                error
340            ))
341            .with_status(self.meta.status)
342            .with_url(&self.meta.url)
343        })
344    }
345
346    /// Deserializes response body as JSON.
347    pub async fn json<T>(&mut self) -> HttpResult<T>
348    where
349        T: DeserializeOwned,
350    {
351        let body = self.bytes().await?;
352        serde_json::from_slice(&body).map_err(|error| {
353            HttpError::decode(format!("Failed to decode response JSON: {}", error))
354                .with_status(self.meta.status)
355                .with_url(&self.meta.url)
356        })
357    }
358
359    /// Overrides the maximum allowed size (in bytes) for one SSE line on this response.
360    ///
361    /// Values below 1 are clamped to 1. Returns `self` so callers can chain configuration
362    /// before consuming the body with [`Self::sse_events`] or [`Self::sse_chunks`]
363    /// (together with [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`], etc.).
364    #[inline]
365    pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
366        self.options.sse_max_line_bytes = max_line_bytes.max(1);
367        self
368    }
369
370    /// Overrides the maximum allowed size (in bytes) for one SSE frame on this response.
371    ///
372    /// Values below 1 are clamped to 1. Returns `self` for chained configuration.
373    #[inline]
374    pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
375        self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
376        self
377    }
378
379    /// Overrides the JSON decoding mode used by [`Self::sse_chunks`] on this response.
380    #[inline]
381    pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
382        self.options.sse_json_mode = mode;
383        self
384    }
385
386    /// Overrides how [`Self::sse_chunks`] detects end-of-stream from trimmed `data:` payloads.
387    #[inline]
388    pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
389        self.options.sse_done_marker_policy = policy;
390        self
391    }
392
393    /// Decodes body stream as SSE events using this response's SSE line/frame byte limits (from
394    /// client defaults unless overridden via [`Self::sse_max_line_bytes`] /
395    /// [`Self::sse_max_frame_bytes`]).
396    pub fn sse_events(mut self) -> SseEventStream {
397        let max_line_bytes = self.options.sse_max_line_bytes;
398        let max_frame_bytes = self.options.sse_max_frame_bytes;
399        match self.stream() {
400            Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
401                stream,
402                max_line_bytes,
403                max_frame_bytes,
404            ),
405            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
406        }
407    }
408
409    /// Decodes SSE `data:` lines as JSON chunks using this response's SSE JSON mode, done-marker
410    /// policy, and line/frame limits (see [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`],
411    /// [`Self::sse_max_line_bytes`], [`Self::sse_max_frame_bytes`]).
412    pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
413    where
414        T: DeserializeOwned + Send + 'static,
415    {
416        let done_policy = self.options.sse_done_marker_policy.clone();
417        let mode = self.options.sse_json_mode;
418        let max_line_bytes = self.options.sse_max_line_bytes;
419        let max_frame_bytes = self.options.sse_max_frame_bytes;
420        match self.stream() {
421            Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
422                stream,
423                done_policy,
424                mode,
425                max_line_bytes,
426                max_frame_bytes,
427            ),
428            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
429        }
430    }
431
432    /// Returns a buffered body reference for response logging if available.
433    ///
434    /// # Returns
435    /// `Some(&Bytes)` when response body has already been buffered.
436    pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
437        self.buffered_body.as_ref()
438    }
439
440    /// Returns whether logger may safely buffer the full body for logging.
441    ///
442    /// # Parameters
443    /// - `body_log_limit`: Configured logging body preview limit in bytes.
444    ///
445    /// # Returns
446    /// `true` only when this response is not SSE, has an explicit `Content-Length`,
447    /// and declared length is within `body_log_limit`.
448    pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
449        if self.backend.is_none() {
450            return false;
451        }
452        if self.is_sse_response() {
453            return false;
454        }
455        self.content_length_hint()
456            .is_some_and(|content_length| content_length <= body_log_limit as u64)
457    }
458
459    /// Reads bounded preview bytes from a response body for status error messages.
460    ///
461    /// # Errors
462    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
463    /// when the supplied cancellation token fires while waiting for preview
464    /// bytes.
465    async fn read_error_body_preview(
466        mut response: reqwest::Response,
467        read_timeout: Duration,
468        cancellation_token: Option<CancellationToken>,
469        method: Method,
470        url: Url,
471        max_bytes: usize,
472    ) -> HttpResult<String> {
473        let limit = max_bytes.max(1);
474        let mut preview = Vec::new();
475        let mut truncated = false;
476
477        loop {
478            let next = if let Some(token) = cancellation_token.as_ref() {
479                tokio::select! {
480                    _ = token.cancelled() => {
481                        return Err(HttpError::cancelled(
482                            "Request cancelled while reading status error response body preview",
483                        )
484                        .with_method(&method)
485                        .with_url(&url));
486                    }
487                    item = tokio::time::timeout(read_timeout, response.chunk()) => item,
488                }
489            } else {
490                tokio::time::timeout(read_timeout, response.chunk()).await
491            };
492            match next {
493                Ok(Ok(Some(chunk))) => {
494                    if preview.len() >= limit {
495                        truncated = true;
496                        break;
497                    }
498                    let remaining = limit - preview.len();
499                    if chunk.len() > remaining {
500                        preview.extend_from_slice(&chunk[..remaining]);
501                        truncated = true;
502                        break;
503                    }
504                    preview.extend_from_slice(&chunk);
505                }
506                Ok(Ok(None)) => break,
507                Ok(Err(error)) => {
508                    return Ok(format!(
509                        "<error body unavailable: failed to read response body: {}>",
510                        error
511                    ));
512                }
513                Err(_) => {
514                    return Ok(format!(
515                        "<error body unavailable: read timeout after {:?}>",
516                        read_timeout
517                    ));
518                }
519            }
520        }
521        Ok(Self::render_error_body_preview(&preview, truncated))
522    }
523
524    /// Returns a cancellation error with response read context when cancelled.
525    ///
526    /// # Parameters
527    /// - `message`: Cancellation message to include in the error.
528    ///
529    /// # Returns
530    /// `Some(HttpError)` when this response has a cancelled token; otherwise
531    /// `None`.
532    fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
533        if self
534            .runtime
535            .cancellation_token
536            .as_ref()
537            .is_some_and(CancellationToken::is_cancelled)
538        {
539            Some(
540                HttpError::cancelled(message.to_string())
541                    .with_method(&self.meta.method)
542                    .with_url(&self.runtime.request_url),
543            )
544        } else {
545            None
546        }
547    }
548
549    /// Returns `Content-Length` parsed from response headers when present and valid.
550    fn content_length_hint(&self) -> Option<u64> {
551        self.meta
552            .headers
553            .get(CONTENT_LENGTH)
554            .and_then(|value| value.to_str().ok())
555            .and_then(|value| value.parse::<u64>().ok())
556    }
557
558    /// Returns whether response content-type is SSE (`text/event-stream`).
559    fn is_sse_response(&self) -> bool {
560        self.meta
561            .headers
562            .get(CONTENT_TYPE)
563            .and_then(|value| value.to_str().ok())
564            .is_some_and(|content_type| {
565                content_type
566                    .to_ascii_lowercase()
567                    .starts_with("text/event-stream")
568            })
569    }
570
571    fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
572        if bytes.is_empty() {
573            return "<empty>".to_string();
574        }
575        let suffix = if truncated { "...<truncated>" } else { "" };
576        match std::str::from_utf8(bytes) {
577            Ok(text) => format!("{text}{suffix}"),
578            Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
579        }
580    }
581}
582
583/// Exercises internal response preview branches for coverage-only tests.
584///
585/// # Returns
586/// Preview diagnostics for buffered, empty, and cancelled response states.
587#[cfg(coverage)]
588#[doc(hidden)]
589pub(crate) async fn coverage_exercise_response_preview_paths() -> Vec<String> {
590    let url = Url::parse("https://example.com/coverage").expect("coverage URL should parse");
591    let buffered = HttpResponse::new(
592        StatusCode::BAD_GATEWAY,
593        HeaderMap::new(),
594        Bytes::from_static(b"abcdef"),
595        url.clone(),
596        Method::GET,
597    )
598    .into_error_body_preview(3)
599    .await
600    .expect("buffered preview should render");
601
602    let empty = HttpResponse {
603        meta: HttpResponseMeta::new(
604            StatusCode::BAD_GATEWAY,
605            HeaderMap::new(),
606            url.clone(),
607            Method::GET,
608        ),
609        backend: None,
610        buffered_body: None,
611        runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url.clone()),
612        options: HttpResponseOptions::default(),
613    }
614    .into_error_body_preview(3)
615    .await
616    .expect("empty preview should render");
617
618    let token = CancellationToken::new();
619    token.cancel();
620    let cancelled = HttpResponse {
621        meta: HttpResponseMeta::new(
622            StatusCode::BAD_GATEWAY,
623            HeaderMap::new(),
624            url.clone(),
625            Method::GET,
626        ),
627        backend: None,
628        buffered_body: None,
629        runtime: HttpResponseRuntime::new(Duration::from_secs(30), Some(token), url),
630        options: HttpResponseOptions::default(),
631    }
632    .into_error_body_preview(3)
633    .await
634    .expect_err("cancelled preview should fail");
635
636    vec![buffered, empty, format!("{:?}", cancelled.kind)]
637}