use std::{pin::Pin, time::Duration};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::TryStreamExt as _;
use http::header::{CONTENT_LENGTH, USER_AGENT};
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use reqx::{
Client as HttpClient, Response as ReqxResponse, advanced::TlsRootStore, prelude::RedirectPolicy,
};
use url::Url;
use crate::{
error::{Error, Result},
transport::{
MAX_BUFFERED_RESPONSE_BODY_BYTES, RequestAttemptState, RequestTimer, RetryConfig,
ServiceErrorAction, TransportRequestBody, content_length_header_value,
content_length_header_value_from_len, default_tls_backend, ensure_method_accepts_body,
map_reqx_error, prepare_user_agent, record_service_retry, reqx_backoff_source,
reqx_retry_policy, response_error_from_body, service_error_action,
validate_request_timeout,
},
};
#[derive(Debug)]
pub(crate) struct AsyncBodyError {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
}
impl std::fmt::Display for AsyncBodyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.source.fmt(f)
}
}
impl std::error::Error for AsyncBodyError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.source.as_ref())
}
}
type AsyncByteStream =
Pin<Box<dyn Stream<Item = std::result::Result<Bytes, AsyncBodyError>> + Send + 'static>>;
pub(crate) fn boxed_byte_stream<S, E>(stream: S) -> AsyncByteStream
where
S: Stream<Item = std::result::Result<Bytes, E>> + Send + 'static,
E: std::error::Error + Send + Sync + 'static,
{
Box::pin(stream.map_err(|err| AsyncBodyError {
source: Box::new(err),
}))
}
pub(crate) enum AsyncBody {
Empty,
Bytes(Bytes),
Stream {
stream: AsyncByteStream,
content_length: Option<u64>,
},
}
impl TransportRequestBody for AsyncBody {
fn is_empty(&self) -> bool {
matches!(self, Self::Empty)
}
fn is_replayable(&self) -> bool {
matches!(self, Self::Empty | Self::Bytes(_))
}
fn clone_for_retry(&self) -> Option<Self> {
match self {
Self::Empty => Some(Self::Empty),
Self::Bytes(b) => Some(Self::Bytes(b.clone())),
Self::Stream { .. } => None,
}
}
}
pub(crate) struct AsyncTransport {
client: HttpClient,
retry: RetryConfig,
user_agent: HeaderValue,
}
#[derive(Clone, Debug)]
pub(crate) struct AsyncResponse {
status: StatusCode,
headers: HeaderMap,
body: Bytes,
}
impl AsyncResponse {
pub(crate) fn from_reqx(resp: ReqxResponse) -> Self {
Self {
status: resp.status(),
headers: resp.headers().clone(),
body: resp.body().clone(),
}
}
pub(crate) fn status(&self) -> StatusCode {
self.status
}
pub(crate) fn headers(&self) -> &HeaderMap {
&self.headers
}
pub(crate) fn body(&self) -> &Bytes {
&self.body
}
pub(crate) fn into_parts(self) -> (StatusCode, HeaderMap, Bytes) {
(self.status, self.headers, self.body)
}
pub(crate) fn text(self) -> Result<String> {
crate::util::text::decode_utf8_response_body(self.body.as_ref())
}
}
impl AsyncTransport {
pub(crate) fn new(
retry: RetryConfig,
user_agent: Option<String>,
timeout: Option<Duration>,
tls_root_store: TlsRootStore,
) -> Result<Self> {
let retry = retry.validate()?;
validate_request_timeout(timeout)?;
let (user_agent_text, user_agent) = prepare_user_agent(user_agent)?;
let mut builder = HttpClient::builder("http://localhost")
.client_name(user_agent_text)
.retry_policy(reqx_retry_policy(retry))
.backoff_source(reqx_backoff_source(retry))
.redirect_policy(RedirectPolicy::none())
.max_response_body_bytes(MAX_BUFFERED_RESPONSE_BODY_BYTES)
.tls_backend(default_tls_backend())
.tls_root_store(tls_root_store);
#[cfg(feature = "metrics")]
{
builder = builder
.observer(crate::transport::TransportMetricsObserver)
.interceptor(crate::transport::TransportMetricsInterceptor);
}
if let Some(timeout) = timeout {
builder = builder.request_timeout(timeout);
}
let client = builder
.build()
.map_err(|e| map_reqx_error("failed to build HTTP client", e))?;
Ok(Self {
client,
retry,
user_agent,
})
}
pub(crate) async fn send(
&self,
method: Method,
url: Url,
headers: HeaderMap,
body: AsyncBody,
) -> Result<AsyncResponse> {
let mut attempts = RequestAttemptState::new(self.retry, body);
let max_attempts = attempts.max_attempts();
let timer = RequestTimer::start();
for attempt in 1..=max_attempts {
let current_body = attempts.next_body()?;
let req = self.build_request(&method, url.clone(), headers.clone(), current_body)?;
let resp = req
.send_response()
.await
.map_err(|err| map_reqx_error("request failed", err))?;
if let Some(action) = service_error_action(
self.retry,
attempt,
max_attempts,
&method,
resp.status(),
resp.headers(),
resp.body(),
) {
match action {
ServiceErrorAction::RetryAfter(delay) => {
record_service_retry(&method);
tokio::time::sleep(delay).await;
continue;
}
ServiceErrorAction::ReturnErr(err) => {
timer.finish_service_error(&method);
return Err(err);
}
}
}
timer.finish(&method);
return Ok(AsyncResponse::from_reqx(resp));
}
Err(Error::transport("request failed after retries", None))
}
pub(crate) async fn send_stream(
&self,
method: Method,
url: Url,
headers: HeaderMap,
body: AsyncBody,
) -> Result<reqx::ResponseStream> {
let mut attempts = RequestAttemptState::new(self.retry, body);
let max_attempts = attempts.max_attempts();
let timer = RequestTimer::start();
for attempt in 1..=max_attempts {
let current_body = attempts.next_body()?;
let req = self.build_request(&method, url.clone(), headers.clone(), current_body)?;
let resp = req
.send_response_stream()
.await
.map_err(|err| map_reqx_error("request failed", err))?;
if let Some(ServiceErrorAction::RetryAfter(delay)) = service_error_action(
self.retry,
attempt,
max_attempts,
&method,
resp.status(),
resp.headers(),
b"",
) {
record_service_retry(&method);
tokio::time::sleep(delay).await;
continue;
}
timer.finish(&method);
return Ok(resp);
}
Err(Error::transport("request failed after retries", None))
}
fn build_request(
&self,
method: &Method,
url: Url,
headers: HeaderMap,
body: AsyncBody,
) -> Result<reqx::RequestBuilder<'_>> {
ensure_method_accepts_body(method, &body)?;
let mut req = self
.client
.request(method.clone(), url.as_str().to_string())
.header(USER_AGENT, self.user_agent.clone());
for (name, value) in headers {
if let Some(name) = name {
req = req.header(name, value);
}
}
req = match body {
AsyncBody::Empty => req,
AsyncBody::Bytes(b) => req
.header(
CONTENT_LENGTH,
content_length_header_value_from_len(b.len())?,
)
.body(b),
AsyncBody::Stream {
stream,
content_length: Some(len),
} => req
.header(CONTENT_LENGTH, content_length_header_value(len)?)
.body_stream(stream),
AsyncBody::Stream {
stream,
content_length: None,
} => req.body_stream(stream),
};
Ok(req)
}
}
pub(crate) fn response_error(resp: AsyncResponse) -> Error {
response_error_from_body(resp.status(), resp.headers(), resp.body())
}
#[cfg(test)]
mod tests {
use std::io::{ErrorKind, Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use super::*;
mod reqx {
pub use ::reqx::advanced::TlsRootStore;
}
type RequestCaptureServer = (SocketAddr, std::thread::JoinHandle<()>, Arc<Mutex<Vec<u8>>>);
fn spawn_test_server(
responses: Vec<Vec<u8>>,
) -> Result<(SocketAddr, std::thread::JoinHandle<()>, Arc<AtomicUsize>)> {
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| Error::transport("failed to bind test server", Some(Box::new(e))))?;
listener
.set_nonblocking(true)
.map_err(|e| Error::transport("failed to configure test server", Some(Box::new(e))))?;
let addr = listener.local_addr().map_err(|e| {
Error::transport("failed to read test server address", Some(Box::new(e)))
})?;
let hits = Arc::new(AtomicUsize::new(0));
let hits_thread = hits.clone();
let handle = std::thread::spawn(move || {
let deadline = Instant::now() + Duration::from_secs(5);
for response in responses {
loop {
match listener.accept() {
Ok((mut stream, _)) => {
let _ = stream.set_nonblocking(false);
let _ = stream.set_read_timeout(Some(Duration::from_secs(1)));
let mut request = Vec::new();
let mut buf = [0u8; 1024];
while !request.windows(4).any(|w| w == b"\r\n\r\n") {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
request.extend_from_slice(&buf[..n]);
if request.len() > 64 * 1024 {
break;
}
}
Err(err)
if matches!(
err.kind(),
ErrorKind::WouldBlock | ErrorKind::TimedOut
) =>
{
break;
}
Err(_) => break,
}
}
let _ = stream.write_all(&response);
let _ = stream.flush();
hits_thread.fetch_add(1, Ordering::SeqCst);
break;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
if Instant::now() >= deadline {
return;
}
std::thread::sleep(Duration::from_millis(10));
}
Err(_) => return,
}
}
}
});
Ok((addr, handle, hits))
}
fn spawn_request_capture_server(response: Vec<u8>) -> Result<RequestCaptureServer> {
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| Error::transport("failed to bind test server", Some(Box::new(e))))?;
listener
.set_nonblocking(true)
.map_err(|e| Error::transport("failed to configure test server", Some(Box::new(e))))?;
let addr = listener.local_addr().map_err(|e| {
Error::transport("failed to read test server address", Some(Box::new(e)))
})?;
let captured = Arc::new(Mutex::new(Vec::new()));
let captured_thread = captured.clone();
let handle = std::thread::spawn(move || {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match listener.accept() {
Ok((mut stream, _)) => {
let _ = stream.set_nonblocking(false);
let _ = stream.set_read_timeout(Some(Duration::from_secs(1)));
let mut request = Vec::new();
let mut buf = [0u8; 1024];
while !request.windows(4).any(|w| w == b"\r\n\r\n") {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => request.extend_from_slice(&buf[..n]),
Err(err)
if matches!(
err.kind(),
ErrorKind::WouldBlock | ErrorKind::TimedOut
) =>
{
break;
}
Err(_) => break,
}
}
*captured_thread.lock().expect("capture lock") = request;
let _ = stream.write_all(&response);
let _ = stream.flush();
return;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
if Instant::now() >= deadline {
return;
}
std::thread::sleep(Duration::from_millis(10));
}
Err(_) => return,
}
}
});
Ok((addr, handle, captured))
}
#[tokio::test]
async fn send_bytes_sets_content_length() -> Result<()> {
let (addr, handle, captured) = spawn_request_capture_server(
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
)?;
let transport = AsyncTransport::new(
RetryConfig {
max_attempts: 1,
..RetryConfig::default()
},
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
transport
.send(
Method::PUT,
url,
HeaderMap::new(),
AsyncBody::Bytes(Bytes::from_static(b"abc")),
)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
let request = String::from_utf8(captured.lock().expect("capture lock").clone())
.map_err(|e| Error::decode("captured request is not valid UTF-8", Some(Box::new(e))))?;
let request = request.to_ascii_lowercase();
assert_eq!(request.matches("\r\ncontent-length: ").count(), 1);
assert!(request.contains("\r\ncontent-length: 3\r\n"));
Ok(())
}
#[tokio::test]
async fn send_returns_response_for_http_error_status() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 1,
..RetryConfig::default()
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
Ok(())
}
#[tokio::test]
async fn send_head_with_content_encoding_and_empty_body_returns_ok() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 200 OK\r\nContent-Encoding: zstd\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 1,
..RetryConfig::default()
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::HEAD, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(http::header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok()),
Some("zstd")
);
assert!(resp.body().is_empty());
Ok(())
}
#[tokio::test]
async fn send_stream_surfaces_embedded_service_error_on_2xx() -> Result<()> {
let body = "<Error><Code>AccessDenied</Code><Message>denied</Message></Error>";
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
let (addr, handle, hits) = spawn_test_server(vec![response.into_bytes()])?;
let retry = RetryConfig {
max_attempts: 1,
..RetryConfig::default()
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let stream = futures_util::stream::once(async {
Ok::<_, std::io::Error>(Bytes::from_static(b"hello"))
});
let err = transport
.send(
Method::PUT,
url,
HeaderMap::new(),
AsyncBody::Stream {
stream: boxed_byte_stream(stream),
content_length: Some(5),
},
)
.await
.expect_err("expected embedded error xml to fail request");
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
match err {
Error::Api {
status,
code,
message,
..
} => {
assert_eq!(status, StatusCode::OK);
assert_eq!(code.as_deref(), Some("AccessDenied"));
assert_eq!(message.as_deref(), Some("denied"));
}
other => panic!("expected api error, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn response_error_extracts_rate_limit() -> Result<()> {
let (addr, handle, _) = spawn_test_server(vec![
b"HTTP/1.1 429 Too Many Requests\r\nRetry-After: 3\r\nx-amz-request-id: req-1\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 1,
..RetryConfig::default()
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
let err = response_error(resp);
match err {
Error::RateLimited {
retry_after,
request_id,
..
} => {
assert_eq!(retry_after, Some(Duration::from_secs(3)));
assert_eq!(request_id.as_deref(), Some("req-1"));
}
other => panic!("expected rate limited error, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn response_error_parses_xml_error_fields() -> Result<()> {
let body = r#"
<Error>
<Code>AccessDenied</Code>
<Message>Access Denied</Message>
<RequestId>req-inner</RequestId>
<HostId>host-1</HostId>
</Error>
"#;
let response = format!(
"HTTP/1.1 403 Forbidden\r\nx-amz-request-id: req-outer\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
let (addr, handle, _) = spawn_test_server(vec![response.into_bytes()])?;
let retry = RetryConfig {
max_attempts: 1,
..RetryConfig::default()
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
let err = response_error(resp);
match err {
Error::Api {
status,
code,
message,
request_id,
host_id,
body_snippet,
} => {
assert_eq!(status, StatusCode::FORBIDDEN);
assert_eq!(code.as_deref(), Some("AccessDenied"));
assert_eq!(message.as_deref(), Some("Access Denied"));
assert_eq!(request_id.as_deref(), Some("req-inner"));
assert_eq!(host_id.as_deref(), Some("host-1"));
assert!(body_snippet.unwrap_or_default().contains("AccessDenied"));
}
other => panic!("expected api error, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn send_retries_on_retryable_status_for_replayable_body() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 429 Too Many Requests\r\nRetry-After: 0\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_does_not_retry_on_non_retryable_4xx_status() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 3,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
Ok(())
}
#[tokio::test]
async fn send_retries_on_retryable_5xx_status() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_exhausts_retryable_status_without_nested_status_retries() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_vec(),
b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
Ok(())
}
#[tokio::test]
async fn send_does_not_retry_on_retryable_status_for_non_idempotent_post() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(
Method::POST,
url,
HeaderMap::new(),
AsyncBody::Bytes(Bytes::from_static(b"hello")),
)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
Ok(())
}
#[tokio::test]
async fn send_retries_on_transport_error_for_replayable_body() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
Vec::new(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_retries_when_buffered_body_read_fails() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 200 OK\r\nContent-Length: 8\r\nConnection: close\r\n\r\nabc".to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_retries_on_embedded_retryable_service_error_xml() -> Result<()> {
let error_xml =
"<Error><Code>InternalError</Code><Message>backend failure</Message></Error>";
let first = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
error_xml.len(),
error_xml
)
.into_bytes();
let (addr, handle, hits) = spawn_test_server(vec![
first,
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_retries_on_retryable_service_error_code_from_4xx_body() -> Result<()> {
let error_xml = "<Error><Code>SlowDown</Code><Message>slow down</Message></Error>";
let first = format!(
"HTTP/1.1 400 Bad Request\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
error_xml.len(),
error_xml
)
.into_bytes();
let (addr, handle, hits) = spawn_test_server(vec![
first,
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
}
#[tokio::test]
async fn send_stream_does_not_retry_for_non_replayable_body() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 429 Too Many Requests\r\nRetry-After: 0\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 3,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let body = AsyncBody::Stream {
stream: boxed_byte_stream(futures_util::stream::empty::<
std::result::Result<Bytes, std::io::Error>,
>()),
content_length: Some(0),
};
let resp = transport
.send_stream(Method::PUT, url, HeaderMap::new(), body)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
Ok(())
}
#[tokio::test]
async fn send_stream_retries_retryable_status_for_replayable_body() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 503 Service Unavailable\r\nRetry-After: 0\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok".to_vec(),
])?;
let retry = RetryConfig {
max_attempts: 2,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
max_retry_after: Duration::from_secs(30),
};
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let mut resp = transport
.send_stream(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
use tokio::io::AsyncReadExt as _;
let mut body = Vec::new();
resp.read_to_end(&mut body)
.await
.map_err(|e| Error::transport("body stream error", Some(Box::new(e))))?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 2);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(body, b"ok");
Ok(())
}
#[tokio::test]
async fn send_rejects_body_for_get() -> Result<()> {
let transport = AsyncTransport::new(
RetryConfig::default(),
None,
Some(Duration::from_secs(1)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse("http://127.0.0.1:9/")
.map_err(|_| Error::invalid_config("invalid test URL"))?;
let err = transport
.send(
Method::GET,
url,
HeaderMap::new(),
AsyncBody::Bytes(Bytes::from_static(b"body")),
)
.await
.expect_err("GET body should be rejected");
match err {
Error::InvalidConfig { message } => {
assert!(message.contains("does not accept a request body"));
}
other => panic!("expected invalid config, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn response_text_rejects_invalid_utf8() {
let resp = AsyncResponse {
status: StatusCode::OK,
headers: HeaderMap::new(),
body: Bytes::from_static(&[0xff]),
};
let err = resp.text().expect_err("invalid UTF-8 must fail");
match err {
Error::Decode { message, .. } => assert!(message.contains("UTF-8")),
other => panic!("expected decode error, got {other:?}"),
}
}
#[tokio::test]
async fn send_stream_preserves_content_encoding_and_raw_bytes() -> Result<()> {
let gzipped = vec![
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xcb, 0x48, 0xcd, 0xc9,
0xc9, 0x07, 0x00, 0x86, 0xa6, 0x10, 0x36, 0x05, 0x00, 0x00, 0x00,
];
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
gzipped.len()
);
let mut wire = response.into_bytes();
wire.extend_from_slice(&gzipped);
let (addr, handle, _) = spawn_test_server(vec![wire])?;
let retry = RetryConfig::default();
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send_stream(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
assert_eq!(
resp.headers()
.get(http::header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok()),
Some("gzip")
);
use tokio::io::AsyncReadExt as _;
let mut resp = resp;
let mut out = Vec::new();
resp.read_to_end(&mut out)
.await
.map_err(|e| Error::transport("body stream error", Some(Box::new(e))))?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(out, gzipped);
Ok(())
}
#[tokio::test]
async fn send_stream_body_read_error_is_observable() -> Result<()> {
let response =
b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\nConnection: close\r\n\r\nabc".to_vec();
let (addr, handle, _) = spawn_test_server(vec![response])?;
let retry = RetryConfig::default();
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send_stream(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
use tokio::io::AsyncReadExt as _;
let mut resp = resp;
let mut saw_error = false;
let mut out = Vec::new();
if resp.read_to_end(&mut out).await.is_err() {
saw_error = true;
}
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert!(
saw_error,
"expected body stream error for truncated response"
);
Ok(())
}
#[tokio::test]
async fn send_stream_into_response_limited_enforces_limit() -> Result<()> {
let payload = "x".repeat(2048);
let response = format!(
"HTTP/1.1 403 Forbidden\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.len(),
payload
);
let (addr, handle, _) = spawn_test_server(vec![response.into_bytes()])?;
let retry = RetryConfig::default();
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send_stream(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
let limited = resp.into_response_limited(1024).await;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert!(limited.is_err(), "expected body limit error");
Ok(())
}
#[tokio::test]
async fn send_does_not_follow_redirect_by_default() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 301 Moved Permanently\r\nLocation: /next\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let retry = RetryConfig::default();
let transport = AsyncTransport::new(
retry,
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let resp = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await?;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::MOVED_PERMANENTLY);
Ok(())
}
#[tokio::test]
async fn send_does_not_succeed_after_query_only_redirect() -> Result<()> {
let (addr, handle, hits) = spawn_test_server(vec![
b"HTTP/1.1 302 Found\r\nLocation: /?next=1\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec(),
])?;
let transport = AsyncTransport::new(
RetryConfig::default(),
None,
Some(Duration::from_secs(5)),
reqx::TlsRootStore::BackendDefault,
)?;
let url = Url::parse(&format!("http://{addr}/"))
.map_err(|_| Error::invalid_config("invalid test server URL"))?;
let outcome = transport
.send(Method::GET, url, HeaderMap::new(), AsyncBody::Empty)
.await;
handle
.join()
.map_err(|_| Error::transport("test server thread panicked", None))?;
let resp = outcome?;
assert_eq!(hits.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), StatusCode::FOUND);
Ok(())
}
#[test]
fn followed_redirect_treats_unparseable_different_uri_as_redirect() {
let request_url = Url::parse("https://example.com/path?x=1").expect("valid URL");
assert!(!crate::transport::followed_redirect(
&request_url,
request_url.as_str()
));
assert!(crate::transport::followed_redirect(
&request_url,
"not-a-valid-uri"
));
}
}