Skip to main content

qubit_http/response/
http_response.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Unified HTTP response type and helpers.
11
12use std::time::Duration;
13
14use async_stream::stream;
15use bytes::Bytes;
16use futures_util::{
17    stream as futures_stream,
18    StreamExt,
19};
20use http::header::{
21    CONTENT_LENGTH,
22    CONTENT_TYPE,
23};
24use http::{
25    HeaderMap,
26    Method,
27    StatusCode,
28};
29use serde::de::DeserializeOwned;
30use tokio_util::sync::CancellationToken;
31use url::Url;
32
33use crate::error::{
34    backend_error_mapper::map_reqwest_error,
35    ReqwestErrorPhase,
36};
37use crate::sse::{
38    DoneMarkerPolicy,
39    SseChunkStream,
40    SseEventStream,
41    SseJsonMode,
42};
43use crate::{
44    HttpByteStream,
45    HttpError,
46    HttpErrorKind,
47    HttpResult,
48};
49
50use super::{
51    HttpResponseMeta,
52    HttpResponseOptions,
53};
54
55/// Runtime state bound to one response instance.
56#[derive(Debug, Clone)]
57struct HttpResponseRuntime {
58    /// Per-response read timeout inherited from request/client.
59    read_timeout: Duration,
60    /// Optional cancellation token inherited from request.
61    cancellation_token: Option<CancellationToken>,
62    /// Request URL used in read/cancellation error context.
63    request_url: Url,
64}
65
66impl HttpResponseRuntime {
67    fn new(
68        read_timeout: Duration,
69        cancellation_token: Option<CancellationToken>,
70        request_url: Url,
71    ) -> Self {
72        Self {
73            read_timeout,
74            cancellation_token,
75            request_url,
76        }
77    }
78}
79
80/// Unified HTTP response with lazily consumed body.
81#[derive(Debug)]
82pub struct HttpResponse {
83    /// Response metadata (status, headers, final URL, request method).
84    pub meta: HttpResponseMeta,
85    /// Raw backend response until consumed.
86    backend: Option<reqwest::Response>,
87    /// Cached full body bytes after eager or lazy read.
88    buffered_body: Option<Bytes>,
89    /// Runtime state inherited from request/client.
90    runtime: HttpResponseRuntime,
91    /// Decode and error-preview options inherited from client options.
92    options: HttpResponseOptions,
93}
94
95impl HttpResponse {
96    /// Creates a buffered response.
97    pub fn new(
98        status: StatusCode,
99        headers: HeaderMap,
100        body: Bytes,
101        url: Url,
102        method: Method,
103    ) -> Self {
104        Self {
105            meta: HttpResponseMeta::new(status, headers, url.clone(), method),
106            backend: None,
107            buffered_body: Some(body),
108            runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
109            options: HttpResponseOptions::default(),
110        }
111    }
112
113    /// Creates a response from backend response and request-scoped options.
114    pub(crate) fn from_backend(
115        meta: HttpResponseMeta,
116        backend: reqwest::Response,
117        read_timeout: Duration,
118        cancellation_token: Option<CancellationToken>,
119        request_url: Url,
120        options: HttpResponseOptions,
121    ) -> Self {
122        Self {
123            meta,
124            backend: Some(backend),
125            buffered_body: None,
126            runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
127            options,
128        }
129    }
130
131    /// Returns shared response metadata.
132    #[inline]
133    pub fn meta(&self) -> &HttpResponseMeta {
134        &self.meta
135    }
136
137    /// Returns response status code.
138    #[inline]
139    pub fn status(&self) -> StatusCode {
140        self.meta.status
141    }
142
143    /// Returns response headers.
144    #[inline]
145    pub fn headers(&self) -> &HeaderMap {
146        &self.meta.headers
147    }
148
149    /// Returns final response URL.
150    #[inline]
151    pub fn url(&self) -> &Url {
152        &self.meta.url
153    }
154
155    /// Returns request URL used in response read context.
156    #[inline]
157    pub fn request_url(&self) -> &Url {
158        &self.runtime.request_url
159    }
160
161    /// Returns whether status is success.
162    #[inline]
163    pub fn is_success(&self) -> bool {
164        self.status().is_success()
165    }
166
167    /// Returns parsed `Retry-After` hint when status and headers provide one.
168    #[inline]
169    pub fn retry_after_hint(&self) -> Option<Duration> {
170        self.meta.retry_after_hint()
171    }
172
173    /// Returns `Ok(self)` for success statuses, otherwise maps a status error
174    /// with `Retry-After` and response-body preview context.
175    pub(crate) async fn into_success_or_status_error(
176        self,
177        message_prefix: &str,
178    ) -> HttpResult<Self> {
179        let status = self.status();
180        if status.is_success() {
181            return Ok(self);
182        }
183        let retry_after = self.retry_after_hint();
184        let method = self.meta.method.clone();
185        let url = self.request_url().clone();
186        let error_preview_limit = self.options.error_response_preview_limit;
187        let body_preview = self.into_error_body_preview(error_preview_limit).await?;
188        let message = format!(
189            "{} with status {} for {} {}; response body preview: {}",
190            message_prefix, status, method, url, body_preview
191        );
192        let mut mapped = HttpError::status(status, message)
193            .with_method(&method)
194            .with_url(&url)
195            .with_response_body_preview(body_preview);
196        if let Some(retry_after) = retry_after {
197            mapped = mapped.with_retry_after(retry_after);
198        }
199        Err(mapped)
200    }
201
202    /// Consumes this response and returns a bounded body preview for status errors.
203    ///
204    /// # Errors
205    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
206    /// when the request cancellation token fires while preview bytes are being
207    /// read.
208    pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
209        let limit = max_bytes.max(1);
210        if let Some(error) = self.cancelled_error_if_needed(
211            "Request cancelled while reading status error response body preview",
212        ) {
213            return Err(error);
214        }
215        if let Some(body) = self.buffered_body.take() {
216            let end = body.len().min(limit);
217            return Ok(Self::render_error_body_preview(
218                &body[..end],
219                body.len() > limit,
220            ));
221        }
222        let Some(backend) = self.backend.take() else {
223            return Ok("<empty>".to_string());
224        };
225        Self::read_error_body_preview(
226            backend,
227            self.runtime.read_timeout,
228            self.runtime.cancellation_token.clone(),
229            self.meta.method.clone(),
230            self.runtime.request_url.clone(),
231            limit,
232        )
233        .await
234    }
235
236    /// Returns full body bytes, consuming backend stream lazily on first call.
237    pub async fn bytes(&mut self) -> HttpResult<Bytes> {
238        if let Some(body) = &self.buffered_body {
239            return Ok(body.clone());
240        }
241        let Some(mut backend) = self.backend.take() else {
242            self.buffered_body = Some(Bytes::new());
243            return Ok(Bytes::new());
244        };
245
246        let method = self.meta.method.clone();
247        let url = self.runtime.request_url.clone();
248        let read_timeout = self.runtime.read_timeout;
249        let cancellation_token = self.runtime.cancellation_token.clone();
250        let mut body = bytes::BytesMut::new();
251
252        loop {
253            let next = if let Some(token) = &cancellation_token {
254                tokio::select! {
255                    _ = token.cancelled() => {
256                        return Err(HttpError::cancelled("Request cancelled while reading response body")
257                            .with_method(&method)
258                            .with_url(&url));
259                    }
260                    item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
261                }
262            } else {
263                tokio::time::timeout(read_timeout, backend.chunk()).await
264            };
265
266            match next {
267                Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
268                Ok(Ok(None)) => {
269                    let body = body.freeze();
270                    self.buffered_body = Some(body.clone());
271                    return Ok(body);
272                }
273                Ok(Err(error)) => {
274                    return Err(map_reqwest_error(
275                        error,
276                        HttpErrorKind::Decode,
277                        ReqwestErrorPhase::Read,
278                        Some(method),
279                        Some(url),
280                    ));
281                }
282                Err(_) => {
283                    return Err(HttpError::read_timeout(format!(
284                        "Read timeout after {:?} while reading response body",
285                        read_timeout
286                    ))
287                    .with_method(&self.meta.method)
288                    .with_url(&self.runtime.request_url));
289                }
290            }
291        }
292    }
293
294    /// Returns body as stream; if already buffered, returns stream backed by cached bytes.
295    pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
296        if let Some(body) = self.buffered_body.as_ref() {
297            let bytes = body.clone();
298            return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
299        }
300        if let Some(error) = self
301            .cancelled_error_if_needed("Streaming response cancelled before reading response body")
302        {
303            return Err(error);
304        }
305        let Some(backend) = self.backend.take() else {
306            return Ok(Box::pin(futures_stream::empty()));
307        };
308
309        let method = self.meta.method.clone();
310        let url = self.runtime.request_url.clone();
311        let read_timeout = self.runtime.read_timeout;
312        let cancellation_token = self.runtime.cancellation_token.clone();
313        let mut stream = backend.bytes_stream();
314        let wrapped = stream! {
315            loop {
316                let next = if let Some(token) = &cancellation_token {
317                    tokio::select! {
318                        _ = token.cancelled() => {
319                            yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
320                                .with_method(&method)
321                                .with_url(&url));
322                            break;
323                        }
324                        item = tokio::time::timeout(read_timeout, stream.next()) => item,
325                    }
326                } else {
327                    tokio::time::timeout(read_timeout, stream.next()).await
328                };
329                match next {
330                    Ok(Some(Ok(bytes))) => yield Ok(bytes),
331                    Ok(Some(Err(error))) => {
332                        let mapped = map_reqwest_error(
333                            error,
334                            HttpErrorKind::Transport,
335                            ReqwestErrorPhase::Read,
336                            Some(method.clone()),
337                            Some(url.clone()),
338                        );
339                        yield Err(mapped);
340                        break;
341                    }
342                    Ok(None) => break,
343                    Err(_) => {
344                        let error = HttpError::read_timeout(format!(
345                            "Read timeout after {:?} while streaming response",
346                            read_timeout
347                        ))
348                        .with_method(&method)
349                        .with_url(&url);
350                        yield Err(error);
351                        break;
352                    }
353                }
354            }
355        };
356        Ok(Box::pin(wrapped))
357    }
358
359    /// Interprets response body as UTF-8 text.
360    pub async fn text(&mut self) -> HttpResult<String> {
361        let body = self.bytes().await?;
362        String::from_utf8(body.to_vec()).map_err(|error| {
363            HttpError::decode(format!(
364                "Failed to decode response body as UTF-8: {}",
365                error
366            ))
367            .with_status(self.meta.status)
368            .with_url(&self.meta.url)
369        })
370    }
371
372    /// Deserializes response body as JSON.
373    pub async fn json<T>(&mut self) -> HttpResult<T>
374    where
375        T: DeserializeOwned,
376    {
377        let body = self.bytes().await?;
378        serde_json::from_slice(&body).map_err(|error| {
379            HttpError::decode(format!("Failed to decode response JSON: {}", error))
380                .with_status(self.meta.status)
381                .with_url(&self.meta.url)
382        })
383    }
384
385    /// Overrides the maximum allowed size (in bytes) for one SSE line on this response.
386    ///
387    /// Values below 1 are clamped to 1. Returns `self` so callers can chain configuration
388    /// before consuming the body with [`Self::sse_events`] or [`Self::sse_chunks`]
389    /// (together with [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`], etc.).
390    #[inline]
391    pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
392        self.options.sse_max_line_bytes = max_line_bytes.max(1);
393        self
394    }
395
396    /// Overrides the maximum allowed size (in bytes) for one SSE frame on this response.
397    ///
398    /// Values below 1 are clamped to 1. Returns `self` for chained configuration.
399    #[inline]
400    pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
401        self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
402        self
403    }
404
405    /// Overrides the JSON decoding mode used by [`Self::sse_chunks`] on this response.
406    #[inline]
407    pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
408        self.options.sse_json_mode = mode;
409        self
410    }
411
412    /// Overrides how [`Self::sse_chunks`] detects end-of-stream from trimmed `data:` payloads.
413    #[inline]
414    pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
415        self.options.sse_done_marker_policy = policy;
416        self
417    }
418
419    /// Decodes body stream as SSE events using this response's SSE line/frame byte limits (from
420    /// client defaults unless overridden via [`Self::sse_max_line_bytes`] /
421    /// [`Self::sse_max_frame_bytes`]).
422    pub fn sse_events(mut self) -> SseEventStream {
423        let max_line_bytes = self.options.sse_max_line_bytes;
424        let max_frame_bytes = self.options.sse_max_frame_bytes;
425        match self.stream() {
426            Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
427                stream,
428                max_line_bytes,
429                max_frame_bytes,
430            ),
431            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
432        }
433    }
434
435    /// Decodes SSE `data:` lines as JSON chunks using this response's SSE JSON mode, done-marker
436    /// policy, and line/frame limits (see [`Self::sse_json_mode`], [`Self::sse_done_marker_policy`],
437    /// [`Self::sse_max_line_bytes`], [`Self::sse_max_frame_bytes`]).
438    pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
439    where
440        T: DeserializeOwned + Send + 'static,
441    {
442        let done_policy = self.options.sse_done_marker_policy.clone();
443        let mode = self.options.sse_json_mode;
444        let max_line_bytes = self.options.sse_max_line_bytes;
445        let max_frame_bytes = self.options.sse_max_frame_bytes;
446        match self.stream() {
447            Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
448                stream,
449                done_policy,
450                mode,
451                max_line_bytes,
452                max_frame_bytes,
453            ),
454            Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
455        }
456    }
457
458    /// Returns a buffered body reference for response logging if available.
459    ///
460    /// # Returns
461    /// `Some(&Bytes)` when response body has already been buffered.
462    pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
463        self.buffered_body.as_ref()
464    }
465
466    /// Returns whether logger may safely buffer the full body for logging.
467    ///
468    /// # Parameters
469    /// - `body_log_limit`: Configured logging body preview limit in bytes.
470    ///
471    /// # Returns
472    /// `true` only when this response is not SSE, has an explicit `Content-Length`,
473    /// and declared length is within `body_log_limit`.
474    pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
475        if self.backend.is_none() {
476            return false;
477        }
478        if self.is_sse_response() {
479            return false;
480        }
481        self.content_length_hint()
482            .is_some_and(|content_length| content_length <= body_log_limit as u64)
483    }
484
485    /// Reads bounded preview bytes from a response body for status error messages.
486    ///
487    /// # Errors
488    /// Returns [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled)
489    /// when the supplied cancellation token fires while waiting for preview
490    /// bytes.
491    async fn read_error_body_preview(
492        mut response: reqwest::Response,
493        read_timeout: Duration,
494        cancellation_token: Option<CancellationToken>,
495        method: Method,
496        url: Url,
497        max_bytes: usize,
498    ) -> HttpResult<String> {
499        let limit = max_bytes.max(1);
500        let mut preview = Vec::new();
501        let mut truncated = false;
502
503        loop {
504            let next = if let Some(token) = cancellation_token.as_ref() {
505                tokio::select! {
506                    _ = token.cancelled() => {
507                        return Err(HttpError::cancelled(
508                            "Request cancelled while reading status error response body preview",
509                        )
510                        .with_method(&method)
511                        .with_url(&url));
512                    }
513                    item = tokio::time::timeout(read_timeout, response.chunk()) => item,
514                }
515            } else {
516                tokio::time::timeout(read_timeout, response.chunk()).await
517            };
518            match next {
519                Ok(Ok(Some(chunk))) => {
520                    if preview.len() >= limit {
521                        truncated = true;
522                        break;
523                    }
524                    let remaining = limit - preview.len();
525                    if chunk.len() > remaining {
526                        preview.extend_from_slice(&chunk[..remaining]);
527                        truncated = true;
528                        break;
529                    }
530                    preview.extend_from_slice(&chunk);
531                }
532                Ok(Ok(None)) => break,
533                Ok(Err(error)) => {
534                    return Ok(format!(
535                        "<error body unavailable: failed to read response body: {}>",
536                        error
537                    ));
538                }
539                Err(_) => {
540                    return Ok(format!(
541                        "<error body unavailable: read timeout after {:?}>",
542                        read_timeout
543                    ));
544                }
545            }
546        }
547        Ok(Self::render_error_body_preview(&preview, truncated))
548    }
549
550    /// Returns a cancellation error with response read context when cancelled.
551    ///
552    /// # Parameters
553    /// - `message`: Cancellation message to include in the error.
554    ///
555    /// # Returns
556    /// `Some(HttpError)` when this response has a cancelled token; otherwise
557    /// `None`.
558    fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
559        if self
560            .runtime
561            .cancellation_token
562            .as_ref()
563            .is_some_and(CancellationToken::is_cancelled)
564        {
565            Some(
566                HttpError::cancelled(message.to_string())
567                    .with_method(&self.meta.method)
568                    .with_url(&self.runtime.request_url),
569            )
570        } else {
571            None
572        }
573    }
574
575    /// Returns `Content-Length` parsed from response headers when present and valid.
576    fn content_length_hint(&self) -> Option<u64> {
577        self.meta
578            .headers
579            .get(CONTENT_LENGTH)
580            .and_then(|value| value.to_str().ok())
581            .and_then(|value| value.parse::<u64>().ok())
582    }
583
584    /// Returns whether response content-type is SSE (`text/event-stream`).
585    fn is_sse_response(&self) -> bool {
586        self.meta
587            .headers
588            .get(CONTENT_TYPE)
589            .and_then(|value| value.to_str().ok())
590            .is_some_and(|content_type| {
591                content_type
592                    .to_ascii_lowercase()
593                    .starts_with("text/event-stream")
594            })
595    }
596
597    fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
598        if bytes.is_empty() {
599            return "<empty>".to_string();
600        }
601        let suffix = if truncated { "...<truncated>" } else { "" };
602        match std::str::from_utf8(bytes) {
603            Ok(text) => format!("{text}{suffix}"),
604            Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
605        }
606    }
607}