use std::time::Duration;
use async_stream::stream;
use bytes::Bytes;
use futures_util::{stream as futures_stream, StreamExt};
use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use http::{HeaderMap, Method, StatusCode};
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use url::Url;
use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
use crate::sse::{DoneMarkerPolicy, SseChunkStream, SseEventStream, SseJsonMode};
use crate::{HttpByteStream, HttpError, HttpErrorKind, HttpResult};
use super::{HttpResponseMeta, HttpResponseOptions};
#[derive(Debug, Clone)]
struct HttpResponseRuntime {
read_timeout: Duration,
cancellation_token: Option<CancellationToken>,
request_url: Url,
}
impl HttpResponseRuntime {
fn new(
read_timeout: Duration,
cancellation_token: Option<CancellationToken>,
request_url: Url,
) -> Self {
Self {
read_timeout,
cancellation_token,
request_url,
}
}
}
#[derive(Debug)]
pub struct HttpResponse {
pub meta: HttpResponseMeta,
backend: Option<reqwest::Response>,
buffered_body: Option<Bytes>,
runtime: HttpResponseRuntime,
options: HttpResponseOptions,
}
impl HttpResponse {
pub fn new(
status: StatusCode,
headers: HeaderMap,
body: Bytes,
url: Url,
method: Method,
) -> Self {
Self {
meta: HttpResponseMeta::new(status, headers, url.clone(), method),
backend: None,
buffered_body: Some(body),
runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url),
options: HttpResponseOptions::default(),
}
}
pub(crate) fn from_backend(
meta: HttpResponseMeta,
backend: reqwest::Response,
read_timeout: Duration,
cancellation_token: Option<CancellationToken>,
request_url: Url,
options: HttpResponseOptions,
) -> Self {
Self {
meta,
backend: Some(backend),
buffered_body: None,
runtime: HttpResponseRuntime::new(read_timeout, cancellation_token, request_url),
options,
}
}
#[inline]
pub fn meta(&self) -> &HttpResponseMeta {
&self.meta
}
#[inline]
pub fn status(&self) -> StatusCode {
self.meta.status
}
#[inline]
pub fn headers(&self) -> &HeaderMap {
&self.meta.headers
}
#[inline]
pub fn url(&self) -> &Url {
&self.meta.url
}
#[inline]
pub fn request_url(&self) -> &Url {
&self.runtime.request_url
}
#[inline]
pub fn is_success(&self) -> bool {
self.status().is_success()
}
#[inline]
pub fn retry_after_hint(&self) -> Option<Duration> {
self.meta.retry_after_hint()
}
pub(crate) async fn into_success_or_status_error(
self,
message_prefix: &str,
) -> HttpResult<Self> {
let status = self.status();
if status.is_success() {
return Ok(self);
}
let retry_after = self.retry_after_hint();
let method = self.meta.method.clone();
let url = self.request_url().clone();
let error_preview_limit = self.options.error_response_preview_limit;
let body_preview = self.into_error_body_preview(error_preview_limit).await?;
let message = format!(
"{} with status {} for {} {}; response body preview: {}",
message_prefix, status, method, url, body_preview
);
let mut mapped = HttpError::status(status, message)
.with_method(&method)
.with_url(&url)
.with_response_body_preview(body_preview);
if let Some(retry_after) = retry_after {
mapped = mapped.with_retry_after(retry_after);
}
Err(mapped)
}
pub(crate) async fn into_error_body_preview(mut self, max_bytes: usize) -> HttpResult<String> {
let limit = max_bytes.max(1);
if let Some(error) = self.cancelled_error_if_needed(
"Request cancelled while reading status error response body preview",
) {
return Err(error);
}
if let Some(body) = self.buffered_body.take() {
let end = body.len().min(limit);
return Ok(Self::render_error_body_preview(
&body[..end],
body.len() > limit,
));
}
let Some(backend) = self.backend.take() else {
return Ok("<empty>".to_string());
};
Self::read_error_body_preview(
backend,
self.runtime.read_timeout,
self.runtime.cancellation_token.clone(),
self.meta.method.clone(),
self.runtime.request_url.clone(),
limit,
)
.await
}
pub async fn bytes(&mut self) -> HttpResult<Bytes> {
if let Some(body) = &self.buffered_body {
return Ok(body.clone());
}
let Some(mut backend) = self.backend.take() else {
self.buffered_body = Some(Bytes::new());
return Ok(Bytes::new());
};
let method = self.meta.method.clone();
let url = self.runtime.request_url.clone();
let read_timeout = self.runtime.read_timeout;
let cancellation_token = self.runtime.cancellation_token.clone();
let mut body = bytes::BytesMut::new();
loop {
let next = if let Some(token) = &cancellation_token {
tokio::select! {
_ = token.cancelled() => {
return Err(HttpError::cancelled("Request cancelled while reading response body")
.with_method(&method)
.with_url(&url));
}
item = tokio::time::timeout(read_timeout, backend.chunk()) => item,
}
} else {
tokio::time::timeout(read_timeout, backend.chunk()).await
};
match next {
Ok(Ok(Some(chunk))) => body.extend_from_slice(&chunk),
Ok(Ok(None)) => {
let body = body.freeze();
self.buffered_body = Some(body.clone());
return Ok(body);
}
Ok(Err(error)) => {
return Err(map_reqwest_error(
error,
HttpErrorKind::Decode,
Some(ReqwestErrorPhase::Read),
Some(method),
Some(url),
));
}
Err(_) => {
return Err(HttpError::read_timeout(format!(
"Read timeout after {:?} while reading response body",
read_timeout
))
.with_method(&self.meta.method)
.with_url(&self.runtime.request_url));
}
}
}
}
pub fn stream(&mut self) -> HttpResult<HttpByteStream> {
if let Some(body) = self.buffered_body.as_ref() {
let bytes = body.clone();
return Ok(Box::pin(futures_stream::once(async move { Ok(bytes) })));
}
if let Some(error) = self
.cancelled_error_if_needed("Streaming response cancelled before reading response body")
{
return Err(error);
}
let Some(backend) = self.backend.take() else {
return Ok(Box::pin(futures_stream::empty()));
};
let method = self.meta.method.clone();
let url = self.runtime.request_url.clone();
let read_timeout = self.runtime.read_timeout;
let cancellation_token = self.runtime.cancellation_token.clone();
let mut stream = backend.bytes_stream();
let wrapped = stream! {
loop {
let next = if let Some(token) = &cancellation_token {
tokio::select! {
_ = token.cancelled() => {
yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
.with_method(&method)
.with_url(&url));
break;
}
item = tokio::time::timeout(read_timeout, stream.next()) => item,
}
} else {
tokio::time::timeout(read_timeout, stream.next()).await
};
match next {
Ok(Some(Ok(bytes))) => yield Ok(bytes),
Ok(Some(Err(error))) => {
let mapped = map_reqwest_error(
error,
HttpErrorKind::Transport,
Some(ReqwestErrorPhase::Read),
Some(method.clone()),
Some(url.clone()),
);
yield Err(mapped);
break;
}
Ok(None) => break,
Err(_) => {
let error = HttpError::read_timeout(format!(
"Read timeout after {:?} while streaming response",
read_timeout
))
.with_method(&method)
.with_url(&url);
yield Err(error);
break;
}
}
}
};
Ok(Box::pin(wrapped))
}
pub async fn text(&mut self) -> HttpResult<String> {
let body = self.bytes().await?;
String::from_utf8(body.to_vec()).map_err(|error| {
HttpError::decode(format!(
"Failed to decode response body as UTF-8: {}",
error
))
.with_status(self.meta.status)
.with_url(&self.meta.url)
})
}
pub async fn json<T>(&mut self) -> HttpResult<T>
where
T: DeserializeOwned,
{
let body = self.bytes().await?;
serde_json::from_slice(&body).map_err(|error| {
HttpError::decode(format!("Failed to decode response JSON: {}", error))
.with_status(self.meta.status)
.with_url(&self.meta.url)
})
}
#[inline]
pub fn sse_max_line_bytes(mut self, max_line_bytes: usize) -> Self {
self.options.sse_max_line_bytes = max_line_bytes.max(1);
self
}
#[inline]
pub fn sse_max_frame_bytes(mut self, max_frame_bytes: usize) -> Self {
self.options.sse_max_frame_bytes = max_frame_bytes.max(1);
self
}
#[inline]
pub fn sse_json_mode(mut self, mode: SseJsonMode) -> Self {
self.options.sse_json_mode = mode;
self
}
#[inline]
pub fn sse_done_marker_policy(mut self, policy: DoneMarkerPolicy) -> Self {
self.options.sse_done_marker_policy = policy;
self
}
pub fn sse_events(mut self) -> SseEventStream {
let max_line_bytes = self.options.sse_max_line_bytes;
let max_frame_bytes = self.options.sse_max_frame_bytes;
match self.stream() {
Ok(stream) => crate::sse::decode_events_from_stream_with_limits(
stream,
max_line_bytes,
max_frame_bytes,
),
Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
}
}
pub fn sse_chunks<T>(mut self) -> SseChunkStream<T>
where
T: DeserializeOwned + Send + 'static,
{
let done_policy = self.options.sse_done_marker_policy.clone();
let mode = self.options.sse_json_mode;
let max_line_bytes = self.options.sse_max_line_bytes;
let max_frame_bytes = self.options.sse_max_frame_bytes;
match self.stream() {
Ok(stream) => crate::sse::decode_json_chunks_from_stream_with_limits(
stream,
done_policy,
mode,
max_line_bytes,
max_frame_bytes,
),
Err(error) => Box::pin(futures_stream::once(async move { Err(error) })),
}
}
pub(crate) fn buffered_body_for_logging(&self) -> Option<&Bytes> {
self.buffered_body.as_ref()
}
pub(crate) fn can_buffer_body_for_logging(&self, body_log_limit: usize) -> bool {
if self.backend.is_none() {
return false;
}
if self.is_sse_response() {
return false;
}
self.content_length_hint()
.is_some_and(|content_length| content_length <= body_log_limit as u64)
}
async fn read_error_body_preview(
mut response: reqwest::Response,
read_timeout: Duration,
cancellation_token: Option<CancellationToken>,
method: Method,
url: Url,
max_bytes: usize,
) -> HttpResult<String> {
let limit = max_bytes.max(1);
let mut preview = Vec::new();
let mut truncated = false;
loop {
let next = if let Some(token) = cancellation_token.as_ref() {
tokio::select! {
_ = token.cancelled() => {
return Err(HttpError::cancelled(
"Request cancelled while reading status error response body preview",
)
.with_method(&method)
.with_url(&url));
}
item = tokio::time::timeout(read_timeout, response.chunk()) => item,
}
} else {
tokio::time::timeout(read_timeout, response.chunk()).await
};
match next {
Ok(Ok(Some(chunk))) => {
if preview.len() >= limit {
truncated = true;
break;
}
let remaining = limit - preview.len();
if chunk.len() > remaining {
preview.extend_from_slice(&chunk[..remaining]);
truncated = true;
break;
}
preview.extend_from_slice(&chunk);
}
Ok(Ok(None)) => break,
Ok(Err(error)) => {
return Ok(format!(
"<error body unavailable: failed to read response body: {}>",
error
));
}
Err(_) => {
return Ok(format!(
"<error body unavailable: read timeout after {:?}>",
read_timeout
));
}
}
}
Ok(Self::render_error_body_preview(&preview, truncated))
}
fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
if self
.runtime
.cancellation_token
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
Some(
HttpError::cancelled(message.to_string())
.with_method(&self.meta.method)
.with_url(&self.runtime.request_url),
)
} else {
None
}
}
fn content_length_hint(&self) -> Option<u64> {
self.meta
.headers
.get(CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok())
}
fn is_sse_response(&self) -> bool {
self.meta
.headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.is_some_and(|content_type| {
content_type
.to_ascii_lowercase()
.starts_with("text/event-stream")
})
}
fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
if bytes.is_empty() {
return "<empty>".to_string();
}
let suffix = if truncated { "...<truncated>" } else { "" };
match std::str::from_utf8(bytes) {
Ok(text) => format!("{text}{suffix}"),
Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
}
}
}
#[cfg(coverage)]
#[doc(hidden)]
pub(crate) async fn coverage_exercise_response_preview_paths() -> Vec<String> {
let url = Url::parse("https://example.com/coverage").expect("coverage URL should parse");
let buffered = HttpResponse::new(
StatusCode::BAD_GATEWAY,
HeaderMap::new(),
Bytes::from_static(b"abcdef"),
url.clone(),
Method::GET,
)
.into_error_body_preview(3)
.await
.expect("buffered preview should render");
let empty = HttpResponse {
meta: HttpResponseMeta::new(
StatusCode::BAD_GATEWAY,
HeaderMap::new(),
url.clone(),
Method::GET,
),
backend: None,
buffered_body: None,
runtime: HttpResponseRuntime::new(Duration::from_secs(30), None, url.clone()),
options: HttpResponseOptions::default(),
}
.into_error_body_preview(3)
.await
.expect("empty preview should render");
let token = CancellationToken::new();
token.cancel();
let cancelled = HttpResponse {
meta: HttpResponseMeta::new(
StatusCode::BAD_GATEWAY,
HeaderMap::new(),
url.clone(),
Method::GET,
),
backend: None,
buffered_body: None,
runtime: HttpResponseRuntime::new(Duration::from_secs(30), Some(token), url),
options: HttpResponseOptions::default(),
}
.into_error_body_preview(3)
.await
.expect_err("cancelled preview should fail");
vec![buffered, empty, format!("{:?}", cancelled.kind)]
}