qubit_http/http_stream_response.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Streaming HTTP response wrapper.
10
11use http::{HeaderMap, StatusCode};
12use serde::de::DeserializeOwned;
13use url::Url;
14
15use crate::{
16 constants::{DEFAULT_SSE_MAX_FRAME_BYTES, DEFAULT_SSE_MAX_LINE_BYTES},
17 sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode},
18 HttpByteStream,
19};
20
21/// HTTP response metadata plus a lazy body stream (from [`crate::HttpClient::execute_stream`]).
22pub struct HttpStreamResponse {
23 /// HTTP status code of the first response line.
24 pub status: StatusCode,
25 /// Response headers available before consuming the body.
26 pub headers: HeaderMap,
27 /// Effective URL after redirects.
28 pub url: Url,
29 /// Body as an async stream of [`bytes::Bytes`] chunks.
30 stream: HttpByteStream,
31 /// Default JSON decoding mode used by [`HttpStreamResponse::decode_json_chunks`].
32 sse_json_mode: SseJsonMode,
33 /// Default maximum bytes allowed for one SSE line.
34 sse_max_line_bytes: usize,
35 /// Default maximum bytes allowed for one SSE frame.
36 sse_max_frame_bytes: usize,
37}
38
39impl HttpStreamResponse {
40 /// Wraps status, headers, URL, and the byte stream.
41 ///
42 /// # Parameters
43 /// - `status`: HTTP status.
44 /// - `headers`: Header map.
45 /// - `url`: Final URL.
46 /// - `stream`: Pinned body stream.
47 ///
48 /// # Returns
49 /// New [`HttpStreamResponse`].
50 pub fn new(status: StatusCode, headers: HeaderMap, url: Url, stream: HttpByteStream) -> Self {
51 Self::new_with_sse_options(
52 status,
53 headers,
54 url,
55 stream,
56 SseJsonMode::Lenient,
57 DEFAULT_SSE_MAX_LINE_BYTES,
58 DEFAULT_SSE_MAX_FRAME_BYTES,
59 )
60 }
61
62 /// Wraps status, headers, URL, and byte stream with SSE decode defaults.
63 ///
64 /// # Parameters
65 /// - `status`: HTTP status.
66 /// - `headers`: Header map.
67 /// - `url`: Final URL.
68 /// - `stream`: Pinned body stream.
69 /// - `sse_json_mode`: Default JSON strictness used by `decode_json_chunks`.
70 /// - `sse_max_line_bytes`: Default max bytes for one SSE line.
71 /// - `sse_max_frame_bytes`: Default max bytes for one SSE frame.
72 ///
73 /// # Returns
74 /// New [`HttpStreamResponse`].
75 pub(crate) fn new_with_sse_options(
76 status: StatusCode,
77 headers: HeaderMap,
78 url: Url,
79 stream: HttpByteStream,
80 sse_json_mode: SseJsonMode,
81 sse_max_line_bytes: usize,
82 sse_max_frame_bytes: usize,
83 ) -> Self {
84 Self {
85 status,
86 headers,
87 url,
88 stream,
89 sse_json_mode,
90 sse_max_line_bytes: sse_max_line_bytes.max(1),
91 sse_max_frame_bytes: sse_max_frame_bytes.max(1),
92 }
93 }
94
95 /// Same semantics as [`crate::HttpResponse::is_success`].
96 ///
97 /// # Returns
98 /// `true` when status is 2xx.
99 pub fn is_success(&self) -> bool {
100 self.status.is_success()
101 }
102
103 /// Destructures `self`, yielding the body stream for manual polling.
104 ///
105 /// # Returns
106 /// The inner [`HttpByteStream`].
107 pub fn into_stream(self) -> HttpByteStream {
108 self.stream
109 }
110
111 /// Decodes current stream body as SSE events (`UTF-8 lines -> SSE frames`).
112 ///
113 /// # Returns
114 /// Stream yielding parsed SSE events.
115 ///
116 /// # Errors
117 /// Each emitted item may contain:
118 /// - transport/read errors forwarded from the underlying HTTP stream;
119 /// - [`crate::HttpError::sse_protocol`] when SSE line UTF-8 decoding fails.
120 pub fn decode_events(self) -> SseEventStream {
121 let max_line_bytes = self.sse_max_line_bytes;
122 let max_frame_bytes = self.sse_max_frame_bytes;
123 self.decode_events_with_limits(max_line_bytes, max_frame_bytes)
124 }
125
126 /// Decodes current stream body as SSE events with explicit line/frame size limits.
127 ///
128 /// # Parameters
129 /// - `max_line_bytes`: Maximum allowed bytes for one SSE line.
130 /// - `max_frame_bytes`: Maximum allowed bytes for one SSE frame.
131 ///
132 /// # Returns
133 /// Stream yielding parsed SSE events.
134 ///
135 /// # Errors
136 /// Each emitted item may contain transport/read/protocol errors and limit violations.
137 pub fn decode_events_with_limits(
138 self,
139 max_line_bytes: usize,
140 max_frame_bytes: usize,
141 ) -> SseEventStream {
142 crate::sse::decode_events_from_response_with_limits(self, max_line_bytes, max_frame_bytes)
143 }
144
145 /// Decodes SSE `data:` payloads as JSON chunks with response defaults.
146 ///
147 /// # Parameters
148 /// - `done_policy`: Done marker policy (for example `[DONE]`).
149 ///
150 /// # Type parameters
151 /// - `T`: Target chunk type deserialized from each `data:` payload.
152 ///
153 /// # Returns
154 /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
155 /// [`crate::sse::SseChunk::Done`].
156 ///
157 /// # Errors
158 /// The stream may emit transport/protocol errors. Malformed JSON behavior is controlled by
159 /// the response default JSON mode (configured by [`crate::HttpClientOptions::sse_json_mode`]).
160 pub fn decode_json_chunks<T>(self, done_policy: DoneMarkerPolicy) -> SseChunkStream<T>
161 where
162 T: DeserializeOwned + Send + 'static,
163 {
164 let mode = self.sse_json_mode;
165 let max_line_bytes = self.sse_max_line_bytes;
166 let max_frame_bytes = self.sse_max_frame_bytes;
167 self.decode_json_chunks_with_mode_and_limits(
168 done_policy,
169 mode,
170 max_line_bytes,
171 max_frame_bytes,
172 )
173 }
174
175 /// Decodes SSE `data:` payloads as JSON chunks with configurable strictness.
176 ///
177 /// # Parameters
178 /// - `done_policy`: Done marker policy (for example `[DONE]`).
179 /// - `mode`: JSON decoding strictness for malformed payloads.
180 ///
181 /// # Type parameters
182 /// - `T`: Target chunk type deserialized from each `data:` payload.
183 ///
184 /// # Returns
185 /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
186 /// [`crate::sse::SseChunk::Done`].
187 ///
188 /// # Errors
189 /// - transport/read errors from underlying stream;
190 /// - protocol errors from SSE parsing;
191 /// - in strict mode, [`crate::HttpError::sse_decode`] on malformed JSON payload.
192 pub fn decode_json_chunks_with_mode<T>(
193 self,
194 done_policy: DoneMarkerPolicy,
195 mode: SseJsonMode,
196 ) -> SseChunkStream<T>
197 where
198 T: DeserializeOwned + Send + 'static,
199 {
200 let max_line_bytes = self.sse_max_line_bytes;
201 let max_frame_bytes = self.sse_max_frame_bytes;
202 self.decode_json_chunks_with_mode_and_limits(
203 done_policy,
204 mode,
205 max_line_bytes,
206 max_frame_bytes,
207 )
208 }
209
210 /// Decodes SSE `data:` payloads as JSON chunks with configurable strictness and limits.
211 ///
212 /// # Parameters
213 /// - `done_policy`: Done marker policy (for example `[DONE]`).
214 /// - `mode`: JSON decoding strictness for malformed payloads.
215 /// - `max_line_bytes`: Maximum allowed bytes for one SSE line.
216 /// - `max_frame_bytes`: Maximum allowed bytes for one SSE frame.
217 ///
218 /// # Type parameters
219 /// - `T`: Target chunk type deserialized from each `data:` payload.
220 ///
221 /// # Returns
222 /// Stream yielding [`crate::sse::SseChunk::Data`] and optional
223 /// [`crate::sse::SseChunk::Done`].
224 pub fn decode_json_chunks_with_mode_and_limits<T>(
225 self,
226 done_policy: DoneMarkerPolicy,
227 mode: SseJsonMode,
228 max_line_bytes: usize,
229 max_frame_bytes: usize,
230 ) -> SseChunkStream<T>
231 where
232 T: DeserializeOwned + Send + 'static,
233 {
234 crate::sse::decode_json_chunks_from_response_with_limits(
235 self,
236 done_policy,
237 mode,
238 max_line_bytes,
239 max_frame_bytes,
240 )
241 }
242}
243
244impl std::fmt::Debug for HttpStreamResponse {
245 /// Debug output includes status, headers, and URL; omits the stream body.
246 ///
247 /// # Parameters
248 /// - `f`: Formatter.
249 ///
250 /// # Returns
251 /// [`std::fmt::Result`].
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 f.debug_struct("HttpStreamResponse")
254 .field("status", &self.status)
255 .field("headers", &self.headers)
256 .field("url", &self.url)
257 .field("sse_json_mode", &self.sse_json_mode)
258 .field("sse_max_line_bytes", &self.sse_max_line_bytes)
259 .field("sse_max_frame_bytes", &self.sse_max_frame_bytes)
260 .finish_non_exhaustive()
261 }
262}