1use std::time::Duration;
12
13use async_stream::stream;
14use bytes::Bytes;
15use futures_util::{stream as futures_stream, StreamExt};
16use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
17use http::{HeaderMap, Method, StatusCode};
18use serde::de::DeserializeOwned;
19use tokio_util::sync::CancellationToken;
20use url::Url;
21
22use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
23use crate::sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode};
24use crate::{HttpByteStream, HttpError, HttpErrorKind, HttpResult};
25
26use super::{HttpResponseMeta, HttpResponseOptions};
27
28#[derive(Debug, Clone)]
30struct HttpResponseRuntime {
31 read_timeout: Duration,
33 cancellation_token: Option<CancellationToken>,
35 request_url: Url,
37}
38
39impl HttpResponseRuntime {
40 fn new(
41 read_timeout: Duration,
42 cancellation_token: Option<CancellationToken>,
43 request_url: Url,
44 ) -> Self {
45 Self {
46 read_timeout,
47 cancellation_token,
48 request_url,
49 }
50 }
51}
52
53#[derive(Debug)]
55pub struct HttpResponse {
56 pub meta: HttpResponseMeta,
58 backend: Option<reqwest::Response>,
60 buffered_body: Option<Bytes>,
62 runtime: HttpResponseRuntime,
64 options: HttpResponseOptions,
66}
67
68impl HttpResponse {
69 pub fn new(
71 status: StatusCode,
72 headers: HeaderMap,
73 body: Bytes,
74 url: Url,
75 method: Method,
76 ) -> Self {
77 Self {
78 meta: HttpResponseMeta::new(status, headers, url.clone(), method),
79 backend: None,
80 buffered_body: Some(body),
81 runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
82 options: HttpResponseOptions::default(),
83 }
84 }
85
86 pub(crate) fn from_backend(
88 meta: HttpResponseMeta,
89 backend: reqwest::Response,
90 read_timeout: Duration,
91 cancellation_token: Option<CancellationToken>,
92 request_url: Url,
93 options: HttpResponseOptions,
94 ) -> Self {
95 Self {
96 meta,
97 backend: Some(backend),
98 buffered_body: None,
99 runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
100 options,
101 }
102 }
103
104 #[inline]
106 pub fn meta(&self) -> &HttpResponseMeta {
107 &self.meta
108 }
109
110 #[inline]
112 pub fn status(&self) -> StatusCode {
113 self.meta.status
114 }
115
116 #[inline]
118 pub fn headers(&self) -> &HeaderMap {
119 &self.meta.headers
120 }
121
122 #[inline]
124 pub fn url(&self) -> &Url {
125 &self.meta.url
126 }
127
128 #[inline]
130 pub fn request_url(&self) -> &Url {
131 &self.runtime.request_url
132 }
133
134 #[inline]
136 pub fn is_success(&self) -> bool {
137 self.status().is_success()
138 }
139
140 #[inline]
142 pub fn retry_after_hint(&self) -> Option<Duration> {
143 self.meta.retry_after_hint()
144 }
145
146 pub(crate) async fn into_success_or_status_error(
149 self,
150 message_prefix: &str,
151 ) -> HttpResult<Self> {
152 let status = self.status();
153 if status.is_success() {
154 return Ok(self);
155 }
156 let retry_after = self.retry_after_hint();
157 let method = self.meta.method.clone();
158 let url = self.request_url().clone();
159 let error_preview_limit = self.options.error_response_preview_limit;
160 let body_preview = self.into_error_body_preview(error_preview_limit).await?;
161 let message = format!(
162 "{} with status {} for {} {}; response body preview: {}",
163 message_prefix, status, method, url, body_preview
164 );
165 let mut mapped = HttpError::status(status, message)
166 .with_method(&method)
167 .with_url(&url)
168 .with_response_body_preview(body_preview);
169 if let Some(retry_after) = retry_after {
170 mapped = mapped.with_retry_after(retry_after);
171 }
172 Err(mapped)
173 }
174
175 pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
182 let limit = max_bytes.max(1);
183 if let Some(error) = self.cancelled_error_if_needed(
184 "Request cancelled while reading status error response body preview",
185 ) {
186 return Err(error);
187 }
188 if let Some(body) = self.buffered_body.take() {
189 let end = body.len().min(limit);
190 return Ok(Self::render_error_body_preview(
191 &body[..end],
192 body.len() > limit,
193 ));
194 }
195 let Some(backend) = self.backend.take() else {
196 return Ok("<empty>".to_string());
197 };
198 Self::read_error_body_preview(
199 backend,
200 self.runtime.read_timeout,
201 self.runtime.cancellation_token.clone(),
202 self.meta.method.clone(),
203 self.runtime.request_url.clone(),
204 limit,
205 )
206 .await
207 }
208
209 pub async fn bytes(&mut self) -> HttpResult<Bytes> {
211 if let Some(body) = &self.buffered_body {
212 return Ok(body.clone());
213 }
214 let Some(mut backend) = self.backend.take() else {
215 self.buffered_body = Some(Bytes::new());
216 return Ok(Bytes::new());
217 };
218
219 let method = self.meta.method.clone();
220 let url = self.runtime.request_url.clone();
221 let read_timeout = self.runtime.read_timeout;
222 let cancellation_token = self.runtime.cancellation_token.clone();
223 let mut body = bytes::BytesMut::new();
224
225 loop {
226 let next = if let Some(token) = &cancellation_token {
227 tokio::select! {
228 _ = token.cancelled() => {
229 return Err(HttpError::cancelled("Request cancelled while reading response body")
230 .with_method(&method)
231 .with_url(&url));
232 }
233 item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
234 }
235 } else {
236 tokio::time::timeout(read_timeout, backend.chunk()).await
237 };
238
239 match next {
240 Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
241 Ok(Ok(None)) => {
242 let body = body.freeze();
243 self.buffered_body = Some(body.clone());
244 return Ok(body);
245 }
246 Ok(Err(error)) => {
247 return Err(map_reqwest_error(
248 error,
249 HttpErrorKind::Decode,
250 Some(ReqwestErrorPhase::Read),
251 Some(method),
252 Some(url),
253 ));
254 }
255 Err(_) => {
256 return Err(HttpError::read_timeout(format!(
257 "Read timeout after {:?} while reading response body",
258 read_timeout
259 ))
260 .with_method(&self.meta.method)
261 .with_url(&self.runtime.request_url));
262 }
263 }
264 }
265 }
266
267 pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
269 if let Some(body) = self.buffered_body.as_ref() {
270 let bytes = body.clone();
271 return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
272 }
273 if let Some(error) = self
274 .cancelled_error_if_needed("Streaming response cancelled before reading response body")
275 {
276 return Err(error);
277 }
278 let Some(backend) = self.backend.take() else {
279 return Ok(Box::pin(futures_stream::empty()));
280 };
281
282 let method = self.meta.method.clone();
283 let url = self.runtime.request_url.clone();
284 let read_timeout = self.runtime.read_timeout;
285 let cancellation_token = self.runtime.cancellation_token.clone();
286 let mut stream = backend.bytes_stream();
287 let wrapped = stream! {
288 loop {
289 let next = if let Some(token) = &cancellation_token {
290 tokio::select! {
291 _ = token.cancelled() => {
292 yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
293 .with_method(&method)
294 .with_url(&url));
295 break;
296 }
297 item = tokio::time::timeout(read_timeout, stream.next()) => item,
298 }
299 } else {
300 tokio::time::timeout(read_timeout, stream.next()).await
301 };
302 match next {
303 Ok(Some(Ok(bytes))) => yield Ok(bytes),
304 Ok(Some(Err(error))) => {
305 let mapped = map_reqwest_error(
306 error,
307 HttpErrorKind::Transport,
308 Some(ReqwestErrorPhase::Read),
309 Some(method.clone()),
310 Some(url.clone()),
311 );
312 yield Err(mapped);
313 break;
314 }
315 Ok(None) => break,
316 Err(_) => {
317 let error = HttpError::read_timeout(format!(
318 "Read timeout after {:?} while streaming response",
319 read_timeout
320 ))
321 .with_method(&method)
322 .with_url(&url);
323 yield Err(error);
324 break;
325 }
326 }
327 }
328 };
329 Ok(Box::pin(wrapped))
330 }
331
332 pub async fn text(&mut self) -> HttpResult<String> {
334 let body = self.bytes().await?;
335 String::from_utf8(body.to_vec()).map_err(|error| {
336 HttpError::decode(format!(
337 "Failed to decode response body as UTF-8: {}",
338 error
339 ))
340 .with_status(self.meta.status)
341 .with_url(&self.meta.url)
342 })
343 }
344
345 pub async fn json<T>(&mut self) -> HttpResult<T>
347 where
348 T: DeserializeOwned,
349 {
350 let body = self.bytes().await?;
351 serde_json::from_slice(&body).map_err(|error| {
352 HttpError::decode(format!("Failed to decode response JSON: {}", error))
353 .with_status(self.meta.status)
354 .with_url(&self.meta.url)
355 })
356 }
357
358 #[inline]
364 pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
365 self.options.sse_max_line_bytes = max_line_bytes.max(1);
366 self
367 }
368
369 #[inline]
373 pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
374 self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
375 self
376 }
377
378 #[inline]
380 pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
381 self.options.sse_json_mode = mode;
382 self
383 }
384
385 #[inline]
387 pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
388 self.options.sse_done_marker_policy = policy;
389 self
390 }
391
392 pub fn sse_events(mut self) -> SseEventStream {
396 let max_line_bytes = self.options.sse_max_line_bytes;
397 let max_frame_bytes = self.options.sse_max_frame_bytes;
398 match self.stream() {
399 Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
400 stream,
401 max_line_bytes,
402 max_frame_bytes,
403 ),
404 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
405 }
406 }
407
408 pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
412 where
413 T: DeserializeOwned + Send + 'static,
414 {
415 let done_policy = self.options.sse_done_marker_policy.clone();
416 let mode = self.options.sse_json_mode;
417 let max_line_bytes = self.options.sse_max_line_bytes;
418 let max_frame_bytes = self.options.sse_max_frame_bytes;
419 match self.stream() {
420 Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
421 stream,
422 done_policy,
423 mode,
424 max_line_bytes,
425 max_frame_bytes,
426 ),
427 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
428 }
429 }
430
431 pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
436 self.buffered_body.as_ref()
437 }
438
439 pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
448 if self.backend.is_none() {
449 return false;
450 }
451 if self.is_sse_response() {
452 return false;
453 }
454 self.content_length_hint()
455 .is_some_and(|content_length| content_length <= body_log_limit as u64)
456 }
457
458 async fn read_error_body_preview(
465 mut response: reqwest::Response,
466 read_timeout: Duration,
467 cancellation_token: Option<CancellationToken>,
468 method: Method,
469 url: Url,
470 max_bytes: usize,
471 ) -> HttpResult<String> {
472 let limit = max_bytes.max(1);
473 let mut preview = Vec::new();
474 let mut truncated = false;
475
476 loop {
477 let next = if let Some(token) = cancellation_token.as_ref() {
478 tokio::select! {
479 _ = token.cancelled() => {
480 return Err(HttpError::cancelled(
481 "Request cancelled while reading status error response body preview",
482 )
483 .with_method(&method)
484 .with_url(&url));
485 }
486 item = tokio::time::timeout(read_timeout, response.chunk()) => item,
487 }
488 } else {
489 tokio::time::timeout(read_timeout, response.chunk()).await
490 };
491 match next {
492 Ok(Ok(Some(chunk))) => {
493 if preview.len() >= limit {
494 truncated = true;
495 break;
496 }
497 let remaining = limit - preview.len();
498 if chunk.len() > remaining {
499 preview.extend_from_slice(&chunk[..remaining]);
500 truncated = true;
501 break;
502 }
503 preview.extend_from_slice(&chunk);
504 }
505 Ok(Ok(None)) => break,
506 Ok(Err(error)) => {
507 return Ok(format!(
508 "<error body unavailable: failed to read response body: {}>",
509 error
510 ));
511 }
512 Err(_) => {
513 return Ok(format!(
514 "<error body unavailable: read timeout after {:?}>",
515 read_timeout
516 ));
517 }
518 }
519 }
520 Ok(Self::render_error_body_preview(&preview, truncated))
521 }
522
523 fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
532 if self
533 .runtime
534 .cancellation_token
535 .as_ref()
536 .is_some_and(CancellationToken::is_cancelled)
537 {
538 Some(
539 HttpError::cancelled(message.to_string())
540 .with_method(&self.meta.method)
541 .with_url(&self.runtime.request_url),
542 )
543 } else {
544 None
545 }
546 }
547
548 fn content_length_hint(&self) -> Option<u64> {
550 self.meta
551 .headers
552 .get(CONTENT_LENGTH)
553 .and_then(|value| value.to_str().ok())
554 .and_then(|value| value.parse::<u64>().ok())
555 }
556
557 fn is_sse_response(&self) -> bool {
559 self.meta
560 .headers
561 .get(CONTENT_TYPE)
562 .and_then(|value| value.to_str().ok())
563 .is_some_and(|content_type| {
564 content_type
565 .to_ascii_lowercase()
566 .starts_with("text/event-stream")
567 })
568 }
569
570 fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
571 if bytes.is_empty() {
572 return "<empty>".to_string();
573 }
574 let suffix = if truncated { "...<truncated>" } else { "" };
575 match std::str::from_utf8(bytes) {
576 Ok(text) => format!("{text}{suffix}"),
577 Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
578 }
579 }
580}
581
582#[cfg(coverage)]
587#[doc(hidden)]
588pub(crate) async fn coverage_exercise_response_preview_paths() -> Vec<String> {
589 let url = Url::parse("https://example.com/coverage").expect("coverage URL should parse");
590 let buffered = HttpResponse::new(
591 StatusCode::BAD_GATEWAY,
592 HeaderMap::new(),
593 Bytes::from_static(b"abcdef"),
594 url.clone(),
595 Method::GET,
596 )
597 .into_error_body_preview(3)
598 .await
599 .expect("buffered preview should render");
600
601 let empty = HttpResponse {
602 meta: HttpResponseMeta::new(
603 StatusCode::BAD_GATEWAY,
604 HeaderMap::new(),
605 url.clone(),
606 Method::GET,
607 ),
608 backend: None,
609 buffered_body: None,
610 runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url.clone()),
611 options: HttpResponseOptions::default(),
612 }
613 .into_error_body_preview(3)
614 .await
615 .expect("empty preview should render");
616
617 let token = CancellationToken::new();
618 token.cancel();
619 let cancelled = HttpResponse {
620 meta: HttpResponseMeta::new(
621 StatusCode::BAD_GATEWAY,
622 HeaderMap::new(),
623 url.clone(),
624 Method::GET,
625 ),
626 backend: None,
627 buffered_body: None,
628 runtime: HttpResponseRuntime::new(Duration::from_secs(30), Some(token), url),
629 options: HttpResponseOptions::default(),
630 }
631 .into_error_body_preview(3)
632 .await
633 .expect_err("cancelled preview should fail");
634
635 vec![buffered, empty, format!("{:?}", cancelled.kind)]
636}