1use std::time::Duration;
13
14use async_stream::stream;
15use bytes::Bytes;
16use futures_util::{stream as futures_stream, StreamExt};
17use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
18use http::{HeaderMap, Method, StatusCode};
19use serde::de::DeserializeOwned;
20use tokio_util::sync::CancellationToken;
21use url::Url;
22
23use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
24use crate::sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode};
25use crate::{HttpByteStream, HttpError, HttpErrorKind, HttpResult};
26
27use super::{HttpResponseMeta, HttpResponseOptions};
28
29#[derive(Debug, Clone)]
31struct HttpResponseRuntime {
32 read_timeout: Duration,
34 cancellation_token: Option<CancellationToken>,
36 request_url: Url,
38}
39
40impl HttpResponseRuntime {
41 fn new(
42 read_timeout: Duration,
43 cancellation_token: Option<CancellationToken>,
44 request_url: Url,
45 ) -> Self {
46 Self {
47 read_timeout,
48 cancellation_token,
49 request_url,
50 }
51 }
52}
53
54#[derive(Debug)]
56pub struct HttpResponse {
57 pub meta: HttpResponseMeta,
59 backend: Option<reqwest::Response>,
61 buffered_body: Option<Bytes>,
63 runtime: HttpResponseRuntime,
65 options: HttpResponseOptions,
67}
68
69impl HttpResponse {
70 pub fn new(
72 status: StatusCode,
73 headers: HeaderMap,
74 body: Bytes,
75 url: Url,
76 method: Method,
77 ) -> Self {
78 Self {
79 meta: HttpResponseMeta::new(status, headers, url.clone(), method),
80 backend: None,
81 buffered_body: Some(body),
82 runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
83 options: HttpResponseOptions::default(),
84 }
85 }
86
87 pub(crate) fn from_backend(
89 meta: HttpResponseMeta,
90 backend: reqwest::Response,
91 read_timeout: Duration,
92 cancellation_token: Option<CancellationToken>,
93 request_url: Url,
94 options: HttpResponseOptions,
95 ) -> Self {
96 Self {
97 meta,
98 backend: Some(backend),
99 buffered_body: None,
100 runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
101 options,
102 }
103 }
104
105 #[inline]
107 pub fn meta(&self) -> &HttpResponseMeta {
108 &self.meta
109 }
110
111 #[inline]
113 pub fn status(&self) -> StatusCode {
114 self.meta.status
115 }
116
117 #[inline]
119 pub fn headers(&self) -> &HeaderMap {
120 &self.meta.headers
121 }
122
123 #[inline]
125 pub fn url(&self) -> &Url {
126 &self.meta.url
127 }
128
129 #[inline]
131 pub fn request_url(&self) -> &Url {
132 &self.runtime.request_url
133 }
134
135 #[inline]
137 pub fn is_success(&self) -> bool {
138 self.status().is_success()
139 }
140
141 #[inline]
143 pub fn retry_after_hint(&self) -> Option<Duration> {
144 self.meta.retry_after_hint()
145 }
146
147 pub(crate) async fn into_success_or_status_error(
150 self,
151 message_prefix: &str,
152 ) -> HttpResult<Self> {
153 let status = self.status();
154 if status.is_success() {
155 return Ok(self);
156 }
157 let retry_after = self.retry_after_hint();
158 let method = self.meta.method.clone();
159 let url = self.request_url().clone();
160 let error_preview_limit = self.options.error_response_preview_limit;
161 let body_preview = self.into_error_body_preview(error_preview_limit).await?;
162 let message = format!(
163 "{} with status {} for {} {}; response body preview: {}",
164 message_prefix, status, method, url, body_preview
165 );
166 let mut mapped = HttpError::status(status, message)
167 .with_method(&method)
168 .with_url(&url)
169 .with_response_body_preview(body_preview);
170 if let Some(retry_after) = retry_after {
171 mapped = mapped.with_retry_after(retry_after);
172 }
173 Err(mapped)
174 }
175
176 pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
183 let limit = max_bytes.max(1);
184 if let Some(error) = self.cancelled_error_if_needed(
185 "Request cancelled while reading status error response body preview",
186 ) {
187 return Err(error);
188 }
189 if let Some(body) = self.buffered_body.take() {
190 let end = body.len().min(limit);
191 return Ok(Self::render_error_body_preview(
192 &body[..end],
193 body.len() > limit,
194 ));
195 }
196 let Some(backend) = self.backend.take() else {
197 return Ok("<empty>".to_string());
198 };
199 Self::read_error_body_preview(
200 backend,
201 self.runtime.read_timeout,
202 self.runtime.cancellation_token.clone(),
203 self.meta.method.clone(),
204 self.runtime.request_url.clone(),
205 limit,
206 )
207 .await
208 }
209
210 pub async fn bytes(&mut self) -> HttpResult<Bytes> {
212 if let Some(body) = &self.buffered_body {
213 return Ok(body.clone());
214 }
215 let Some(mut backend) = self.backend.take() else {
216 self.buffered_body = Some(Bytes::new());
217 return Ok(Bytes::new());
218 };
219
220 let method = self.meta.method.clone();
221 let url = self.runtime.request_url.clone();
222 let read_timeout = self.runtime.read_timeout;
223 let cancellation_token = self.runtime.cancellation_token.clone();
224 let mut body = bytes::BytesMut::new();
225
226 loop {
227 let next = if let Some(token) = &cancellation_token {
228 tokio::select! {
229 _ = token.cancelled() => {
230 return Err(HttpError::cancelled("Request cancelled while reading response body")
231 .with_method(&method)
232 .with_url(&url));
233 }
234 item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
235 }
236 } else {
237 tokio::time::timeout(read_timeout, backend.chunk()).await
238 };
239
240 match next {
241 Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
242 Ok(Ok(None)) => {
243 let body = body.freeze();
244 self.buffered_body = Some(body.clone());
245 return Ok(body);
246 }
247 Ok(Err(error)) => {
248 return Err(map_reqwest_error(
249 error,
250 HttpErrorKind::Decode,
251 ReqwestErrorPhase::Read,
252 Some(method),
253 Some(url),
254 ));
255 }
256 Err(_) => {
257 return Err(HttpError::read_timeout(format!(
258 "Read timeout after {:?} while reading response body",
259 read_timeout
260 ))
261 .with_method(&self.meta.method)
262 .with_url(&self.runtime.request_url));
263 }
264 }
265 }
266 }
267
268 pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
270 if let Some(body) = self.buffered_body.as_ref() {
271 let bytes = body.clone();
272 return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
273 }
274 if let Some(error) = self
275 .cancelled_error_if_needed("Streaming response cancelled before reading response body")
276 {
277 return Err(error);
278 }
279 let Some(backend) = self.backend.take() else {
280 return Ok(Box::pin(futures_stream::empty()));
281 };
282
283 let method = self.meta.method.clone();
284 let url = self.runtime.request_url.clone();
285 let read_timeout = self.runtime.read_timeout;
286 let cancellation_token = self.runtime.cancellation_token.clone();
287 let mut stream = backend.bytes_stream();
288 let wrapped = stream! {
289 loop {
290 let next = if let Some(token) = &cancellation_token {
291 tokio::select! {
292 _ = token.cancelled() => {
293 yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
294 .with_method(&method)
295 .with_url(&url));
296 break;
297 }
298 item = tokio::time::timeout(read_timeout, stream.next()) => item,
299 }
300 } else {
301 tokio::time::timeout(read_timeout, stream.next()).await
302 };
303 match next {
304 Ok(Some(Ok(bytes))) => yield Ok(bytes),
305 Ok(Some(Err(error))) => {
306 let mapped = map_reqwest_error(
307 error,
308 HttpErrorKind::Transport,
309 ReqwestErrorPhase::Read,
310 Some(method.clone()),
311 Some(url.clone()),
312 );
313 yield Err(mapped);
314 break;
315 }
316 Ok(None) => break,
317 Err(_) => {
318 let error = HttpError::read_timeout(format!(
319 "Read timeout after {:?} while streaming response",
320 read_timeout
321 ))
322 .with_method(&method)
323 .with_url(&url);
324 yield Err(error);
325 break;
326 }
327 }
328 }
329 };
330 Ok(Box::pin(wrapped))
331 }
332
333 pub async fn text(&mut self) -> HttpResult<String> {
335 let body = self.bytes().await?;
336 String::from_utf8(body.to_vec()).map_err(|error| {
337 HttpError::decode(format!(
338 "Failed to decode response body as UTF-8: {}",
339 error
340 ))
341 .with_status(self.meta.status)
342 .with_url(&self.meta.url)
343 })
344 }
345
346 pub async fn json<T>(&mut self) -> HttpResult<T>
348 where
349 T: DeserializeOwned,
350 {
351 let body = self.bytes().await?;
352 serde_json::from_slice(&body).map_err(|error| {
353 HttpError::decode(format!("Failed to decode response JSON: {}", error))
354 .with_status(self.meta.status)
355 .with_url(&self.meta.url)
356 })
357 }
358
359 #[inline]
365 pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
366 self.options.sse_max_line_bytes = max_line_bytes.max(1);
367 self
368 }
369
370 #[inline]
374 pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
375 self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
376 self
377 }
378
379 #[inline]
381 pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
382 self.options.sse_json_mode = mode;
383 self
384 }
385
386 #[inline]
388 pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
389 self.options.sse_done_marker_policy = policy;
390 self
391 }
392
393 pub fn sse_events(mut self) -> SseEventStream {
397 let max_line_bytes = self.options.sse_max_line_bytes;
398 let max_frame_bytes = self.options.sse_max_frame_bytes;
399 match self.stream() {
400 Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
401 stream,
402 max_line_bytes,
403 max_frame_bytes,
404 ),
405 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
406 }
407 }
408
409 pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
413 where
414 T: DeserializeOwned + Send + 'static,
415 {
416 let done_policy = self.options.sse_done_marker_policy.clone();
417 let mode = self.options.sse_json_mode;
418 let max_line_bytes = self.options.sse_max_line_bytes;
419 let max_frame_bytes = self.options.sse_max_frame_bytes;
420 match self.stream() {
421 Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
422 stream,
423 done_policy,
424 mode,
425 max_line_bytes,
426 max_frame_bytes,
427 ),
428 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
429 }
430 }
431
432 pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
437 self.buffered_body.as_ref()
438 }
439
440 pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
449 if self.backend.is_none() {
450 return false;
451 }
452 if self.is_sse_response() {
453 return false;
454 }
455 self.content_length_hint()
456 .is_some_and(|content_length| content_length <= body_log_limit as u64)
457 }
458
459 async fn read_error_body_preview(
466 mut response: reqwest::Response,
467 read_timeout: Duration,
468 cancellation_token: Option<CancellationToken>,
469 method: Method,
470 url: Url,
471 max_bytes: usize,
472 ) -> HttpResult<String> {
473 let limit = max_bytes.max(1);
474 let mut preview = Vec::new();
475 let mut truncated = false;
476
477 loop {
478 let next = if let Some(token) = cancellation_token.as_ref() {
479 tokio::select! {
480 _ = token.cancelled() => {
481 return Err(HttpError::cancelled(
482 "Request cancelled while reading status error response body preview",
483 )
484 .with_method(&method)
485 .with_url(&url));
486 }
487 item = tokio::time::timeout(read_timeout, response.chunk()) => item,
488 }
489 } else {
490 tokio::time::timeout(read_timeout, response.chunk()).await
491 };
492 match next {
493 Ok(Ok(Some(chunk))) => {
494 if preview.len() >= limit {
495 truncated = true;
496 break;
497 }
498 let remaining = limit - preview.len();
499 if chunk.len() > remaining {
500 preview.extend_from_slice(&chunk[..remaining]);
501 truncated = true;
502 break;
503 }
504 preview.extend_from_slice(&chunk);
505 }
506 Ok(Ok(None)) => break,
507 Ok(Err(error)) => {
508 return Ok(format!(
509 "<error body unavailable: failed to read response body: {}>",
510 error
511 ));
512 }
513 Err(_) => {
514 return Ok(format!(
515 "<error body unavailable: read timeout after {:?}>",
516 read_timeout
517 ));
518 }
519 }
520 }
521 Ok(Self::render_error_body_preview(&preview, truncated))
522 }
523
524 fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
533 if self
534 .runtime
535 .cancellation_token
536 .as_ref()
537 .is_some_and(CancellationToken::is_cancelled)
538 {
539 Some(
540 HttpError::cancelled(message.to_string())
541 .with_method(&self.meta.method)
542 .with_url(&self.runtime.request_url),
543 )
544 } else {
545 None
546 }
547 }
548
549 fn content_length_hint(&self) -> Option<u64> {
551 self.meta
552 .headers
553 .get(CONTENT_LENGTH)
554 .and_then(|value| value.to_str().ok())
555 .and_then(|value| value.parse::<u64>().ok())
556 }
557
558 fn is_sse_response(&self) -> bool {
560 self.meta
561 .headers
562 .get(CONTENT_TYPE)
563 .and_then(|value| value.to_str().ok())
564 .is_some_and(|content_type| {
565 content_type
566 .to_ascii_lowercase()
567 .starts_with("text/event-stream")
568 })
569 }
570
571 fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
572 if bytes.is_empty() {
573 return "<empty>".to_string();
574 }
575 let suffix = if truncated { "...<truncated>" } else { "" };
576 match std::str::from_utf8(bytes) {
577 Ok(text) => format!("{text}{suffix}"),
578 Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
579 }
580 }
581}