1use std::time::Duration;
13
14use async_stream::stream;
15use bytes::Bytes;
16use futures_util::{
17 stream as futures_stream,
18 StreamExt,
19};
20use http::header::{
21 CONTENT_LENGTH,
22 CONTENT_TYPE,
23};
24use http::{
25 HeaderMap,
26 Method,
27 StatusCode,
28};
29use serde::de::DeserializeOwned;
30use tokio_util::sync::CancellationToken;
31use url::Url;
32
33use crate::error::{
34 backend_error_mapper::map_reqwest_error,
35 ReqwestErrorPhase,
36};
37use crate::sse::{
38 DoneMarkerPolicy,
39 SseChunkStream,
40 SseEventStream,
41 SseJsonMode,
42};
43use crate::{
44 HttpByteStream,
45 HttpError,
46 HttpErrorKind,
47 HttpResult,
48};
49
50use super::{
51 HttpResponseMeta,
52 HttpResponseOptions,
53};
54
55#[derive(Debug, Clone)]
57struct HttpResponseRuntime {
58 read_timeout: Duration,
60 cancellation_token: Option<CancellationToken>,
62 request_url: Url,
64}
65
66impl HttpResponseRuntime {
67 fn new(
68 read_timeout: Duration,
69 cancellation_token: Option<CancellationToken>,
70 request_url: Url,
71 ) -> Self {
72 Self {
73 read_timeout,
74 cancellation_token,
75 request_url,
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct HttpResponse {
83 pub meta: HttpResponseMeta,
85 backend: Option<reqwest::Response>,
87 buffered_body: Option<Bytes>,
89 runtime: HttpResponseRuntime,
91 options: HttpResponseOptions,
93}
94
95impl HttpResponse {
96 pub fn new(
98 status: StatusCode,
99 headers: HeaderMap,
100 body: Bytes,
101 url: Url,
102 method: Method,
103 ) -> Self {
104 Self {
105 meta: HttpResponseMeta::new(status, headers, url.clone(), method),
106 backend: None,
107 buffered_body: Some(body),
108 runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
109 options: HttpResponseOptions::default(),
110 }
111 }
112
113 pub(crate) fn from_backend(
115 meta: HttpResponseMeta,
116 backend: reqwest::Response,
117 read_timeout: Duration,
118 cancellation_token: Option<CancellationToken>,
119 request_url: Url,
120 options: HttpResponseOptions,
121 ) -> Self {
122 Self {
123 meta,
124 backend: Some(backend),
125 buffered_body: None,
126 runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
127 options,
128 }
129 }
130
131 #[inline]
133 pub fn meta(&self) -> &HttpResponseMeta {
134 &self.meta
135 }
136
137 #[inline]
139 pub fn status(&self) -> StatusCode {
140 self.meta.status
141 }
142
143 #[inline]
145 pub fn headers(&self) -> &HeaderMap {
146 &self.meta.headers
147 }
148
149 #[inline]
151 pub fn url(&self) -> &Url {
152 &self.meta.url
153 }
154
155 #[inline]
157 pub fn request_url(&self) -> &Url {
158 &self.runtime.request_url
159 }
160
161 #[inline]
163 pub fn is_success(&self) -> bool {
164 self.status().is_success()
165 }
166
167 #[inline]
169 pub fn retry_after_hint(&self) -> Option<Duration> {
170 self.meta.retry_after_hint()
171 }
172
173 pub(crate) async fn into_success_or_status_error(
176 self,
177 message_prefix: &str,
178 ) -> HttpResult<Self> {
179 let status = self.status();
180 if status.is_success() {
181 return Ok(self);
182 }
183 let retry_after = self.retry_after_hint();
184 let method = self.meta.method.clone();
185 let url = self.request_url().clone();
186 let error_preview_limit = self.options.error_response_preview_limit;
187 let body_preview = self.into_error_body_preview(error_preview_limit).await?;
188 let message = format!(
189 "{} with status {} for {} {}; response body preview: {}",
190 message_prefix, status, method, url, body_preview
191 );
192 let mut mapped = HttpError::status(status, message)
193 .with_method(&method)
194 .with_url(&url)
195 .with_response_body_preview(body_preview);
196 if let Some(retry_after) = retry_after {
197 mapped = mapped.with_retry_after(retry_after);
198 }
199 Err(mapped)
200 }
201
202 pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
209 let limit = max_bytes.max(1);
210 if let Some(error) = self.cancelled_error_if_needed(
211 "Request cancelled while reading status error response body preview",
212 ) {
213 return Err(error);
214 }
215 if let Some(body) = self.buffered_body.take() {
216 let end = body.len().min(limit);
217 return Ok(Self::render_error_body_preview(
218 &body[..end],
219 body.len() > limit,
220 ));
221 }
222 let Some(backend) = self.backend.take() else {
223 return Ok("<empty>".to_string());
224 };
225 Self::read_error_body_preview(
226 backend,
227 self.runtime.read_timeout,
228 self.runtime.cancellation_token.clone(),
229 self.meta.method.clone(),
230 self.runtime.request_url.clone(),
231 limit,
232 )
233 .await
234 }
235
236 pub async fn bytes(&mut self) -> HttpResult<Bytes> {
238 if let Some(body) = &self.buffered_body {
239 return Ok(body.clone());
240 }
241 let Some(mut backend) = self.backend.take() else {
242 self.buffered_body = Some(Bytes::new());
243 return Ok(Bytes::new());
244 };
245
246 let method = self.meta.method.clone();
247 let url = self.runtime.request_url.clone();
248 let read_timeout = self.runtime.read_timeout;
249 let cancellation_token = self.runtime.cancellation_token.clone();
250 let mut body = bytes::BytesMut::new();
251
252 loop {
253 let next = if let Some(token) = &cancellation_token {
254 tokio::select! {
255 _ = token.cancelled() => {
256 return Err(HttpError::cancelled("Request cancelled while reading response body")
257 .with_method(&method)
258 .with_url(&url));
259 }
260 item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
261 }
262 } else {
263 tokio::time::timeout(read_timeout, backend.chunk()).await
264 };
265
266 match next {
267 Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
268 Ok(Ok(None)) => {
269 let body = body.freeze();
270 self.buffered_body = Some(body.clone());
271 return Ok(body);
272 }
273 Ok(Err(error)) => {
274 return Err(map_reqwest_error(
275 error,
276 HttpErrorKind::Decode,
277 ReqwestErrorPhase::Read,
278 Some(method),
279 Some(url),
280 ));
281 }
282 Err(_) => {
283 return Err(HttpError::read_timeout(format!(
284 "Read timeout after {:?} while reading response body",
285 read_timeout
286 ))
287 .with_method(&self.meta.method)
288 .with_url(&self.runtime.request_url));
289 }
290 }
291 }
292 }
293
294 pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
296 if let Some(body) = self.buffered_body.as_ref() {
297 let bytes = body.clone();
298 return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
299 }
300 if let Some(error) = self
301 .cancelled_error_if_needed("Streaming response cancelled before reading response body")
302 {
303 return Err(error);
304 }
305 let Some(backend) = self.backend.take() else {
306 return Ok(Box::pin(futures_stream::empty()));
307 };
308
309 let method = self.meta.method.clone();
310 let url = self.runtime.request_url.clone();
311 let read_timeout = self.runtime.read_timeout;
312 let cancellation_token = self.runtime.cancellation_token.clone();
313 let mut stream = backend.bytes_stream();
314 let wrapped = stream! {
315 loop {
316 let next = if let Some(token) = &cancellation_token {
317 tokio::select! {
318 _ = token.cancelled() => {
319 yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
320 .with_method(&method)
321 .with_url(&url));
322 break;
323 }
324 item = tokio::time::timeout(read_timeout, stream.next()) => item,
325 }
326 } else {
327 tokio::time::timeout(read_timeout, stream.next()).await
328 };
329 match next {
330 Ok(Some(Ok(bytes))) => yield Ok(bytes),
331 Ok(Some(Err(error))) => {
332 let mapped = map_reqwest_error(
333 error,
334 HttpErrorKind::Transport,
335 ReqwestErrorPhase::Read,
336 Some(method.clone()),
337 Some(url.clone()),
338 );
339 yield Err(mapped);
340 break;
341 }
342 Ok(None) => break,
343 Err(_) => {
344 let error = HttpError::read_timeout(format!(
345 "Read timeout after {:?} while streaming response",
346 read_timeout
347 ))
348 .with_method(&method)
349 .with_url(&url);
350 yield Err(error);
351 break;
352 }
353 }
354 }
355 };
356 Ok(Box::pin(wrapped))
357 }
358
359 pub async fn text(&mut self) -> HttpResult<String> {
361 let body = self.bytes().await?;
362 String::from_utf8(body.to_vec()).map_err(|error| {
363 HttpError::decode(format!(
364 "Failed to decode response body as UTF-8: {}",
365 error
366 ))
367 .with_status(self.meta.status)
368 .with_url(&self.meta.url)
369 })
370 }
371
372 pub async fn json<T>(&mut self) -> HttpResult<T>
374 where
375 T: DeserializeOwned,
376 {
377 let body = self.bytes().await?;
378 serde_json::from_slice(&body).map_err(|error| {
379 HttpError::decode(format!("Failed to decode response JSON: {}", error))
380 .with_status(self.meta.status)
381 .with_url(&self.meta.url)
382 })
383 }
384
385 #[inline]
391 pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
392 self.options.sse_max_line_bytes = max_line_bytes.max(1);
393 self
394 }
395
396 #[inline]
400 pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
401 self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
402 self
403 }
404
405 #[inline]
407 pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
408 self.options.sse_json_mode = mode;
409 self
410 }
411
412 #[inline]
414 pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
415 self.options.sse_done_marker_policy = policy;
416 self
417 }
418
419 pub fn sse_events(mut self) -> SseEventStream {
423 let max_line_bytes = self.options.sse_max_line_bytes;
424 let max_frame_bytes = self.options.sse_max_frame_bytes;
425 match self.stream() {
426 Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
427 stream,
428 max_line_bytes,
429 max_frame_bytes,
430 ),
431 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
432 }
433 }
434
435 pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
439 where
440 T: DeserializeOwned + Send + 'static,
441 {
442 let done_policy = self.options.sse_done_marker_policy.clone();
443 let mode = self.options.sse_json_mode;
444 let max_line_bytes = self.options.sse_max_line_bytes;
445 let max_frame_bytes = self.options.sse_max_frame_bytes;
446 match self.stream() {
447 Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
448 stream,
449 done_policy,
450 mode,
451 max_line_bytes,
452 max_frame_bytes,
453 ),
454 Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
455 }
456 }
457
458 pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
463 self.buffered_body.as_ref()
464 }
465
466 pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
475 if self.backend.is_none() {
476 return false;
477 }
478 if self.is_sse_response() {
479 return false;
480 }
481 self.content_length_hint()
482 .is_some_and(|content_length| content_length <= body_log_limit as u64)
483 }
484
485 async fn read_error_body_preview(
492 mut response: reqwest::Response,
493 read_timeout: Duration,
494 cancellation_token: Option<CancellationToken>,
495 method: Method,
496 url: Url,
497 max_bytes: usize,
498 ) -> HttpResult<String> {
499 let limit = max_bytes.max(1);
500 let mut preview = Vec::new();
501 let mut truncated = false;
502
503 loop {
504 let next = if let Some(token) = cancellation_token.as_ref() {
505 tokio::select! {
506 _ = token.cancelled() => {
507 return Err(HttpError::cancelled(
508 "Request cancelled while reading status error response body preview",
509 )
510 .with_method(&method)
511 .with_url(&url));
512 }
513 item = tokio::time::timeout(read_timeout, response.chunk()) => item,
514 }
515 } else {
516 tokio::time::timeout(read_timeout, response.chunk()).await
517 };
518 match next {
519 Ok(Ok(Some(chunk))) => {
520 if preview.len() >= limit {
521 truncated = true;
522 break;
523 }
524 let remaining = limit - preview.len();
525 if chunk.len() > remaining {
526 preview.extend_from_slice(&chunk[..remaining]);
527 truncated = true;
528 break;
529 }
530 preview.extend_from_slice(&chunk);
531 }
532 Ok(Ok(None)) => break,
533 Ok(Err(error)) => {
534 return Ok(format!(
535 "<error body unavailable: failed to read response body: {}>",
536 error
537 ));
538 }
539 Err(_) => {
540 return Ok(format!(
541 "<error body unavailable: read timeout after {:?}>",
542 read_timeout
543 ));
544 }
545 }
546 }
547 Ok(Self::render_error_body_preview(&preview, truncated))
548 }
549
550 fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
559 if self
560 .runtime
561 .cancellation_token
562 .as_ref()
563 .is_some_and(CancellationToken::is_cancelled)
564 {
565 Some(
566 HttpError::cancelled(message.to_string())
567 .with_method(&self.meta.method)
568 .with_url(&self.runtime.request_url),
569 )
570 } else {
571 None
572 }
573 }
574
575 fn content_length_hint(&self) -> Option<u64> {
577 self.meta
578 .headers
579 .get(CONTENT_LENGTH)
580 .and_then(|value| value.to_str().ok())
581 .and_then(|value| value.parse::<u64>().ok())
582 }
583
584 fn is_sse_response(&self) -> bool {
586 self.meta
587 .headers
588 .get(CONTENT_TYPE)
589 .and_then(|value| value.to_str().ok())
590 .is_some_and(|content_type| {
591 content_type
592 .to_ascii_lowercase()
593 .starts_with("text/event-stream")
594 })
595 }
596
597 fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
598 if bytes.is_empty() {
599 return "<empty>".to_string();
600 }
601 let suffix = if truncated { "...<truncated>" } else { "" };
602 match std::str::from_utf8(bytes) {
603 Ok(text) => format!("{text}{suffix}"),
604 Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
605 }
606 }
607}