Skip to main content

qubit_http/
http_stream_response.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Streaming HTTP response wrapper.
10
11use http::{HeaderMap, StatusCode};
12use serde::de::DeserializeOwned;
13use url::Url;
14
15use crate::{
16    constants::{DEFAULT_SSE_MAX_FRAME_BYTES, DEFAULT_SSE_MAX_LINE_BYTES},
17    sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode},
18    HttpByteStream,
19};
20
21/// HTTP response metadata plus a lazy body stream (from [`crate::HttpClient::execute_stream`]).
22pub struct HttpStreamResponse {
23    /// HTTP status code of the first response line.
24    pub status: StatusCode,
25    /// Response headers available before consuming the body.
26    pub headers: HeaderMap,
27    /// Effective URL after redirects.
28    pub url: Url,
29    /// Body as an async stream of [`bytes::Bytes`] chunks.
30    stream: HttpByteStream,
31    /// Default JSON decoding mode used by [`HttpStreamResponse::decode_json_chunks`].
32    sse_json_mode: SseJsonMode,
33    /// Default maximum bytes allowed for one SSE line.
34    sse_max_line_bytes: usize,
35    /// Default maximum bytes allowed for one SSE frame.
36    sse_max_frame_bytes: usize,
37}
38
39impl HttpStreamResponse {
40    /// Wraps status, headers, URL, and the byte stream.
41    ///
42    /// # Parameters
43    /// - `status`: HTTP status.
44    /// - `headers`: Header map.
45    /// - `url`: Final URL.
46    /// - `stream`: Pinned body stream.
47    ///
48    /// # Returns
49    /// New [`HttpStreamResponse`].
50    pub fn new(status: StatusCode, headers: HeaderMap, url: Url, stream: HttpByteStream) -> Self {
51        Self::new_with_sse_options(
52            status,
53            headers,
54            url,
55            stream,
56            SseJsonMode::Lenient,
57            DEFAULT_SSE_MAX_LINE_BYTES,
58            DEFAULT_SSE_MAX_FRAME_BYTES,
59        )
60    }
61
62    /// Wraps status, headers, URL, and byte stream with SSE decode defaults.
63    ///
64    /// # Parameters
65    /// - `status`: HTTP status.
66    /// - `headers`: Header map.
67    /// - `url`: Final URL.
68    /// - `stream`: Pinned body stream.
69    /// - `sse_json_mode`: Default JSON strictness used by `decode_json_chunks`.
70    /// - `sse_max_line_bytes`: Default max bytes for one SSE line.
71    /// - `sse_max_frame_bytes`: Default max bytes for one SSE frame.
72    ///
73    /// # Returns
74    /// New [`HttpStreamResponse`].
75    pub(crate) fn new_with_sse_options(
76        status: StatusCode,
77        headers: HeaderMap,
78        url: Url,
79        stream: HttpByteStream,
80        sse_json_mode: SseJsonMode,
81        sse_max_line_bytes: usize,
82        sse_max_frame_bytes: usize,
83    ) -> Self {
84        Self {
85            status,
86            headers,
87            url,
88            stream,
89            sse_json_mode,
90            sse_max_line_bytes: sse_max_line_bytes.max(1),
91            sse_max_frame_bytes: sse_max_frame_bytes.max(1),
92        }
93    }
94
95    /// Same semantics as [`crate::HttpResponse::is_success`].
96    ///
97    /// # Returns
98    /// `true` when status is 2xx.
99    pub fn is_success(&self) -> bool {
100        self.status.is_success()
101    }
102
103    /// Destructures `self`, yielding the body stream for manual polling.
104    ///
105    /// # Returns
106    /// The inner [`HttpByteStream`].
107    pub fn into_stream(self) -> HttpByteStream {
108        self.stream
109    }
110
111    /// Decodes current stream body as SSE events (`UTF-8 lines -> SSE frames`).
112    ///
113    /// # Returns
114    /// Stream yielding parsed SSE events.
115    ///
116    /// # Errors
117    /// Each emitted item may contain:
118    /// - transport/read errors forwarded from the underlying HTTP stream;
119    /// - [`crate::HttpError::sse_protocol`] when SSE line UTF-8 decoding fails.
120    pub fn decode_events(self) -> SseEventStream {
121        let max_line_bytes = self.sse_max_line_bytes;
122        let max_frame_bytes = self.sse_max_frame_bytes;
123        self.decode_events_with_limits(max_line_bytes, max_frame_bytes)
124    }
125
126    /// Decodes current stream body as SSE events with explicit line/frame size limits.
127    ///
128    /// # Parameters
129    /// - `max_line_bytes`: Maximum allowed bytes for one SSE line.
130    /// - `max_frame_bytes`: Maximum allowed bytes for one SSE frame.
131    ///
132    /// # Returns
133    /// Stream yielding parsed SSE events.
134    ///
135    /// # Errors
136    /// Each emitted item may contain transport/read/protocol errors and limit violations.
137    pub fn decode_events_with_limits(
138        self,
139        max_line_bytes: usize,
140        max_frame_bytes: usize,
141    ) -> SseEventStream {
142        crate::sse::decode_events_from_response_with_limits(self, max_line_bytes, max_frame_bytes)
143    }
144
145    /// Decodes SSE `data:` payloads as JSON chunks with response defaults.
146    ///
147    /// # Parameters
148    /// - `done_policy`: Done marker policy (for example `[DONE]`).
149    ///
150    /// # Type parameters
151    /// - `T`: Target chunk type deserialized from each `data:` payload.
152    ///
153    /// # Returns
154    /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
155    /// [`crate::sse::SseChunk::Done`].
156    ///
157    /// # Errors
158    /// The stream may emit transport/protocol errors. Malformed JSON behavior is controlled by
159    /// the response default JSON mode (configured by [`crate::HttpClientOptions::sse_json_mode`]).
160    pub fn decode_json_chunks<T>(self, done_policy: DoneMarkerPolicy) -> SseChunkStream<T>
161    where
162        T: DeserializeOwned + Send + 'static,
163    {
164        let mode = self.sse_json_mode;
165        let max_line_bytes = self.sse_max_line_bytes;
166        let max_frame_bytes = self.sse_max_frame_bytes;
167        self.decode_json_chunks_with_mode_and_limits(
168            done_policy,
169            mode,
170            max_line_bytes,
171            max_frame_bytes,
172        )
173    }
174
175    /// Decodes SSE `data:` payloads as JSON chunks with configurable strictness.
176    ///
177    /// # Parameters
178    /// - `done_policy`: Done marker policy (for example `[DONE]`).
179    /// - `mode`: JSON decoding strictness for malformed payloads.
180    ///
181    /// # Type parameters
182    /// - `T`: Target chunk type deserialized from each `data:` payload.
183    ///
184    /// # Returns
185    /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
186    /// [`crate::sse::SseChunk::Done`].
187    ///
188    /// # Errors
189    /// - transport/read errors from underlying stream;
190    /// - protocol errors from SSE parsing;
191    /// - in strict mode, [`crate::HttpError::sse_decode`] on malformed JSON payload.
192    pub fn decode_json_chunks_with_mode<T>(
193        self,
194        done_policy: DoneMarkerPolicy,
195        mode: SseJsonMode,
196    ) -> SseChunkStream<T>
197    where
198        T: DeserializeOwned + Send + 'static,
199    {
200        let max_line_bytes = self.sse_max_line_bytes;
201        let max_frame_bytes = self.sse_max_frame_bytes;
202        self.decode_json_chunks_with_mode_and_limits(
203            done_policy,
204            mode,
205            max_line_bytes,
206            max_frame_bytes,
207        )
208    }
209
210    /// Decodes SSE `data:` payloads as JSON chunks with configurable strictness and limits.
211    ///
212    /// # Parameters
213    /// - `done_policy`: Done marker policy (for example `[DONE]`).
214    /// - `mode`: JSON decoding strictness for malformed payloads.
215    /// - `max_line_bytes`: Maximum allowed bytes for one SSE line.
216    /// - `max_frame_bytes`: Maximum allowed bytes for one SSE frame.
217    ///
218    /// # Type parameters
219    /// - `T`: Target chunk type deserialized from each `data:` payload.
220    ///
221    /// # Returns
222    /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
223    /// [`crate::sse::SseChunk::Done`].
224    pub fn decode_json_chunks_with_mode_and_limits<T>(
225        self,
226        done_policy: DoneMarkerPolicy,
227        mode: SseJsonMode,
228        max_line_bytes: usize,
229        max_frame_bytes: usize,
230    ) -> SseChunkStream<T>
231    where
232        T: DeserializeOwned + Send + 'static,
233    {
234        crate::sse::decode_json_chunks_from_response_with_limits(
235            self,
236            done_policy,
237            mode,
238            max_line_bytes,
239            max_frame_bytes,
240        )
241    }
242}
243
244impl std::fmt::Debug for HttpStreamResponse {
245    /// Debug output includes status, headers, and URL; omits the stream body.
246    ///
247    /// # Parameters
248    /// - `f`: Formatter.
249    ///
250    /// # Returns
251    /// [`std::fmt::Result`].
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        f.debug_struct("HttpStreamResponse")
254            .field("status", &self.status)
255            .field("headers", &self.headers)
256            .field("url", &self.url)
257            .field("sse_json_mode", &self.sse_json_mode)
258            .field("sse_max_line_bytes", &self.sse_max_line_bytes)
259            .field("sse_max_frame_bytes", &self.sse_max_frame_bytes)
260            .finish_non_exhaustive()
261    }
262}