use crate::builder::HttpClientBuilder;
use crate::config::TransportSecurity;
use crate::error::HttpError;
use crate::request::RequestBuilder;
use crate::response::ResponseBody;
use bytes::Bytes;
use http::{Request, Response};
use http_body_util::Full;
use std::future::Future;
use std::pin::Pin;
use tower::Service;
use tower::buffer::Buffer;
pub type ServiceFuture =
Pin<Box<dyn Future<Output = Result<Response<ResponseBody>, HttpError>> + Send>>;
pub type BufferedService = Buffer<Request<Full<Bytes>>, ServiceFuture>;
#[derive(Clone)]
pub struct HttpClient {
pub(crate) service: BufferedService,
pub(crate) max_body_size: usize,
pub(crate) transport_security: TransportSecurity,
}
impl HttpClient {
pub fn new() -> Result<Self, HttpError> {
HttpClientBuilder::new().build()
}
#[must_use]
pub fn builder() -> HttpClientBuilder {
HttpClientBuilder::new()
}
pub fn get(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::GET,
url.to_owned(),
self.transport_security,
)
}
pub fn post(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::POST,
url.to_owned(),
self.transport_security,
)
}
pub fn put(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::PUT,
url.to_owned(),
self.transport_security,
)
}
pub fn patch(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::PATCH,
url.to_owned(),
self.transport_security,
)
}
pub fn delete(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::DELETE,
url.to_owned(),
self.transport_security,
)
}
pub fn head(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::HEAD,
url.to_owned(),
self.transport_security,
)
}
pub fn options(&self, url: &str) -> RequestBuilder {
RequestBuilder::new(
self.service.clone(),
self.max_body_size,
http::Method::OPTIONS,
url.to_owned(),
self.transport_security,
)
}
}
pub fn map_buffer_error(err: tower::BoxError) -> HttpError {
match err.downcast::<HttpError>() {
Ok(http_err) => *http_err,
Err(err) => {
tracing::error!(
error = %err,
"buffer worker closed unexpectedly; service unavailable"
);
HttpError::ServiceClosed
}
}
}
pub async fn try_acquire_buffer_slot(service: &mut BufferedService) -> Result<(), HttpError> {
use std::task::Poll;
let poll_result = std::future::poll_fn(|cx| match service.poll_ready(cx) {
Poll::Ready(result) => Poll::Ready(Some(result)),
Poll::Pending => Poll::Ready(None), })
.await;
match poll_result {
Some(Ok(())) => Ok(()),
Some(Err(e)) => Err(map_buffer_error(e)),
None => Err(HttpError::Overloaded), }
}
#[cfg(test)]
#[cfg(not(feature = "fips"))]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use crate::error::HttpError;
use httpmock::prelude::*;
use serde_json::json;
fn test_client() -> HttpClient {
HttpClientBuilder::new().retry(None).build().unwrap()
}
#[tokio::test]
async fn test_http_client_get() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/test");
then.status(200).json_body(json!({"success": true}));
});
let client = test_client();
let url = format!("{}/test", server.base_url());
let resp = client.get(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_post() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::POST).path("/action");
then.status(200).json_body(json!({"ok": true}));
});
let client = test_client();
let url = format!("{}/action", server.base_url());
let resp = client.post(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_post_form() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::POST)
.path("/submit")
.header("content-type", "application/x-www-form-urlencoded")
.body("key1=value1&key2=value2");
then.status(200).json_body(json!({"received": true}));
});
let client = test_client();
let url = format!("{}/submit", server.base_url());
let resp = client
.post(&url)
.form(&[("key1", "value1"), ("key2", "value2")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_json_body_parsing() {
#[derive(serde::Deserialize)]
struct TestResponse {
name: String,
value: i32,
}
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/json");
then.status(200)
.json_body(json!({"name": "test", "value": 42}));
});
let client = test_client();
let url = format!("{}/json", server.base_url());
let data: TestResponse = client.get(&url).send().await.unwrap().json().await.unwrap();
assert_eq!(data.name, "test");
assert_eq!(data.value, 42);
}
#[tokio::test]
async fn test_body_size_limit() {
let server = MockServer::start();
let large_body = "x".repeat(1024 * 1024); let _m = server.mock(|when, then| {
when.method(Method::GET).path("/large");
then.status(200).body(&large_body);
});
let client = HttpClientBuilder::new()
.retry(None)
.max_body_size(1024) .build()
.unwrap();
let url = format!("{}/large", server.base_url());
let result = client.get(&url).send().await.unwrap().bytes().await;
assert!(matches!(result, Err(HttpError::BodyTooLarge { .. })));
}
#[tokio::test]
async fn test_custom_user_agent() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET)
.path("/test")
.header("user-agent", "custom/1.0");
then.status(200);
});
let client = HttpClientBuilder::new()
.retry(None)
.user_agent("custom/1.0")
.build()
.unwrap();
let url = format!("{}/test", server.base_url());
let resp = client.get(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_non_2xx_returns_http_status_error() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/error");
then.status(404)
.header("content-type", "application/json")
.body(r#"{"error": "not found"}"#);
});
let client = test_client();
let url = format!("{}/error", server.base_url());
let result: Result<serde_json::Value, _> =
client.get(&url).send().await.unwrap().json().await;
match result {
Err(HttpError::HttpStatus {
status,
body_preview,
content_type,
..
}) => {
assert_eq!(status, hyper::StatusCode::NOT_FOUND);
assert!(body_preview.contains("not found"));
assert_eq!(content_type, Some("application/json".to_owned()));
}
other => panic!("Expected HttpStatus error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_checked_body_success() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/data");
then.status(200).body("hello world");
});
let client = test_client();
let url = format!("{}/data", server.base_url());
let body = client
.get(&url)
.send()
.await
.unwrap()
.checked_bytes()
.await
.unwrap();
assert_eq!(&body[..], b"hello world");
}
#[tokio::test]
async fn test_client_is_clone() {
let client = test_client();
let client2 = client.clone();
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/test");
then.status(200);
});
let url = format!("{}/test", server.base_url());
let resp1 = client.get(&url).send().await.unwrap();
let resp2 = client2.get(&url).send().await.unwrap();
assert_eq!(resp1.status(), hyper::StatusCode::OK);
assert_eq!(resp2.status(), hyper::StatusCode::OK);
}
#[test]
fn test_http_client_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<HttpClient>();
}
#[tokio::test]
async fn test_concurrent_requests_50() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/concurrent");
then.status(200).body("ok");
});
let client = test_client();
let url = format!("{}/concurrent", server.base_url());
let handles: Vec<_> = (0..50)
.map(|_| {
let client = client.clone();
let url = url.clone();
tokio::spawn(async move { client.get(&url).send().await })
})
.collect();
for handle in handles {
let resp = handle.await.unwrap().unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
}
#[tokio::test]
async fn test_small_buffer_capacity_no_deadlock() {
use crate::config::HttpClientConfig;
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/test");
then.status(200).body("ok");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: None,
rate_limit: None,
buffer_capacity: 2,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/test", server.base_url());
let handles: Vec<_> = (0..10)
.map(|_| {
let client = client.clone();
let url = url.clone();
tokio::spawn(async move { client.get(&url).send().await })
})
.collect();
let timeout_result = tokio::time::timeout(std::time::Duration::from_secs(10), async {
let mut results = Vec::new();
for handle in handles {
results.push(handle.await);
}
results
})
.await;
let results = timeout_result.expect("requests should complete within timeout");
let mut success_count = 0;
let mut overloaded_count = 0;
for result in results {
match result.unwrap() {
Ok(resp) => {
assert_eq!(resp.status(), hyper::StatusCode::OK);
success_count += 1;
}
Err(HttpError::Overloaded) => {
overloaded_count += 1;
}
Err(e) => panic!("unexpected error: {e:?}"),
}
}
assert!(success_count > 0, "at least one request should succeed");
assert_eq!(success_count + overloaded_count, 10);
}
#[tokio::test]
async fn test_buffer_overflow_returns_overloaded() {
use crate::config::HttpClientConfig;
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/slow");
then.status(200).body("ok");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: None,
rate_limit: None,
buffer_capacity: 1,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/slow", server.base_url());
let client1 = client.clone();
let url1 = url.clone();
let handle1 = tokio::spawn(async move { client1.get(&url1).send().await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let result2 = tokio::time::timeout(
std::time::Duration::from_millis(50),
client.get(&url).send(),
)
.await;
let inner_result = result2.expect("request should not timeout waiting for buffer");
match inner_result {
Err(HttpError::Overloaded) | Ok(_) => {}
Err(e) => panic!("unexpected error: {e:?}"),
}
_ = handle1.await;
}
#[tokio::test]
async fn test_large_body_no_deadlock() {
let server = MockServer::start();
let large_body = "x".repeat(100 * 1024); let _m = server.mock(|when, then| {
when.method(Method::GET).path("/large");
then.status(200).body(&large_body);
});
let client = HttpClientBuilder::new()
.retry(None)
.max_body_size(1024 * 1024) .build()
.unwrap();
let url = format!("{}/large", server.base_url());
let handles: Vec<_> = (0..5)
.map(|_| {
let client = client.clone();
let url = url.clone();
tokio::spawn(async move { client.get(&url).send().await?.checked_bytes().await })
})
.collect();
let timeout_result = tokio::time::timeout(std::time::Duration::from_secs(10), async {
let mut results = Vec::new();
for handle in handles {
results.push(handle.await);
}
results
})
.await;
let results = timeout_result.expect("body reads should complete within timeout");
for result in results {
let body = result.unwrap().unwrap();
assert_eq!(body.len(), 100 * 1024);
}
}
#[tokio::test]
async fn test_token_endpoint_post_not_retried() {
use crate::config::HttpClientConfig;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::POST).path("/token");
then.status(500).body("server error");
});
let mut config = HttpClientConfig::token_endpoint();
config.transport = crate::config::TransportSecurity::AllowInsecureHttp;
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/token", server.base_url());
let result = client
.post(&url)
.form(&[("grant_type", "client_credentials"), ("client_id", "test")])
.unwrap()
.send()
.await;
assert!(result.is_ok()); let response = result.unwrap();
assert_eq!(response.status(), hyper::StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(
mock.calls(),
1,
"POST should not be retried; expected 1 call, got {}",
mock.calls()
);
}
#[tokio::test]
async fn test_http_client_put() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PUT).path("/resource");
then.status(200).json_body(json!({"updated": true}));
});
let client = test_client();
let url = format!("{}/resource", server.base_url());
let resp = client.put(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_put_form() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PUT)
.path("/resource")
.header("content-type", "application/x-www-form-urlencoded")
.body("name=updated&value=123");
then.status(200).json_body(json!({"updated": true}));
});
let client = test_client();
let url = format!("{}/resource", server.base_url());
let resp = client
.put(&url)
.form(&[("name", "updated"), ("value", "123")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_patch() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PATCH).path("/resource/1");
then.status(200).json_body(json!({"patched": true}));
});
let client = test_client();
let url = format!("{}/resource/1", server.base_url());
let resp = client.patch(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_patch_form() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PATCH)
.path("/resource/1")
.header("content-type", "application/x-www-form-urlencoded")
.body("field=patched");
then.status(200).json_body(json!({"patched": true}));
});
let client = test_client();
let url = format!("{}/resource/1", server.base_url());
let resp = client
.patch(&url)
.form(&[("field", "patched")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_delete() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::DELETE).path("/resource/42");
then.status(204);
});
let client = test_client();
let url = format!("{}/resource/42", server.base_url());
let resp = client.delete(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn test_http_client_delete_returns_200() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::DELETE).path("/resource/99");
then.status(200).json_body(json!({"deleted": true}));
});
let client = test_client();
let url = format!("{}/resource/99", server.base_url());
let resp = client.delete(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_http_client_head_returns_headers_no_body() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::HEAD).path("/probe");
then.status(200)
.header("content-length", "12345")
.header("etag", "\"abc\"");
});
let client = test_client();
let url = format!("{}/probe", server.base_url());
let resp = client.head(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
assert_eq!(
resp.headers()
.get("content-length")
.and_then(|v| v.to_str().ok()),
Some("12345"),
);
let body = resp.bytes().await.unwrap();
assert!(body.is_empty(), "HEAD response body must be empty");
}
#[tokio::test]
async fn test_http_client_options_returns_allow_header() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::OPTIONS).path("/resource");
then.status(204).header("allow", "GET, POST, PUT, DELETE");
});
let client = test_client();
let url = format!("{}/resource", server.base_url());
let resp = client.options(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::NO_CONTENT);
assert_eq!(
resp.headers().get("allow").and_then(|v| v.to_str().ok()),
Some("GET, POST, PUT, DELETE"),
);
}
#[tokio::test]
async fn test_put_form_with_custom_headers() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PUT)
.path("/api/data")
.header("content-type", "application/x-www-form-urlencoded")
.header("x-custom-header", "custom-value")
.body("key=value");
then.status(200);
});
let client = test_client();
let url = format!("{}/api/data", server.base_url());
let resp = client
.put(&url)
.header("x-custom-header", "custom-value")
.form(&[("key", "value")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_patch_form_with_custom_headers() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::PATCH)
.path("/api/data")
.header("content-type", "application/x-www-form-urlencoded")
.header("authorization", "Bearer token123")
.body("status=active");
then.status(200);
});
let client = test_client();
let url = format!("{}/api/data", server.base_url());
let resp = client
.patch(&url)
.header("authorization", "Bearer token123")
.form(&[("status", "active")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_request_builder_json_body() {
#[derive(serde::Serialize)]
struct CreateUser {
name: String,
email: String,
}
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::POST)
.path("/users")
.header("content-type", "application/json")
.json_body(json!({"name": "Alice", "email": "alice@example.com"}));
then.status(201).json_body(json!({"id": 1}));
});
let client = test_client();
let url = format!("{}/users", server.base_url());
let resp = client
.post(&url)
.json(&CreateUser {
name: "Alice".into(),
email: "alice@example.com".into(),
})
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::CREATED);
}
#[tokio::test]
async fn test_request_builder_body_bytes() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::POST)
.path("/upload")
.body("raw binary data");
then.status(200);
});
let client = test_client();
let url = format!("{}/upload", server.base_url());
let resp = client
.post(&url)
.body_bytes(bytes::Bytes::from("raw binary data"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_content_type_not_duplicated_with_json() {
#[derive(serde::Serialize)]
struct TestData {
value: i32,
}
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::POST)
.path("/custom-content-type")
.header("content-type", "application/vnd.custom+json");
then.status(200);
});
let client = test_client();
let url = format!("{}/custom-content-type", server.base_url());
let resp = client
.post(&url)
.header("content-type", "application/vnd.custom+json") .json(&TestData { value: 42 })
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
assert_eq!(
mock.calls(),
1,
"Request with custom Content-Type should match"
);
}
#[tokio::test]
async fn test_content_type_not_duplicated_with_form() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::POST)
.path("/custom-form-type")
.header("content-type", "application/x-custom-form");
then.status(200);
});
let client = test_client();
let url = format!("{}/custom-form-type", server.base_url());
let resp = client
.post(&url)
.header("content-type", "application/x-custom-form") .form(&[("key", "value")])
.unwrap()
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
assert_eq!(
mock.calls(),
1,
"Request with custom Content-Type should match"
);
}
#[tokio::test]
async fn test_request_builder_body_string() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::POST)
.path("/text")
.body("Hello, World!");
then.status(200);
});
let client = test_client();
let url = format!("{}/text", server.base_url());
let resp = client
.post(&url)
.body_string("Hello, World!".into())
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_response_text_method() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/text");
then.status(200).body("Hello, World!");
});
let client = test_client();
let url = format!("{}/text", server.base_url());
let text = client.get(&url).send().await.unwrap().text().await.unwrap();
assert_eq!(text, "Hello, World!");
}
#[tokio::test]
async fn test_request_builder_multiple_headers() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET)
.path("/headers")
.header("x-first", "one")
.header("x-second", "two");
then.status(200);
});
let client = test_client();
let url = format!("{}/headers", server.base_url());
let resp = client
.get(&url)
.header("x-first", "one")
.header("x-second", "two")
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_request_builder_headers_vec() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET)
.path("/headers")
.header("x-first", "one")
.header("x-second", "two");
then.status(200);
});
let client = test_client();
let url = format!("{}/headers", server.base_url());
let resp = client
.get(&url)
.headers(vec![
("x-first".to_owned(), "one".to_owned()),
("x-second".to_owned(), "two".to_owned()),
])
.send()
.await
.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
}
#[tokio::test]
async fn test_error_response_with_large_body_returns_http_status() {
use crate::security::ERROR_BODY_PREVIEW_LIMIT;
let server = MockServer::start();
let large_body = "x".repeat(ERROR_BODY_PREVIEW_LIMIT + 1000);
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/error-with-large-body");
then.status(500).body(&large_body);
});
let client = test_client();
let url = format!("{}/error-with-large-body", server.base_url());
let result = client.get(&url).send().await.unwrap().checked_bytes().await;
match result {
Err(HttpError::HttpStatus {
status,
body_preview,
..
}) => {
assert_eq!(status, hyper::StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(body_preview, "<body too large for preview>");
}
Err(HttpError::BodyTooLarge { .. }) => {
panic!("Should return HttpStatus, not BodyTooLarge for non-2xx responses");
}
Err(other) => panic!("Unexpected error: {other:?}"),
Ok(_) => panic!("Should have returned an error for 500 status"),
}
}
fn gzip_compress(data: &[u8]) -> Vec<u8> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data).unwrap();
encoder.finish().unwrap()
}
#[tokio::test]
async fn test_gzip_decompression_basic() {
let server = MockServer::start();
let original_body = b"Hello, this is a test body that will be gzip compressed!";
let compressed_body = gzip_compress(original_body);
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/gzip");
then.status(200)
.header("content-encoding", "gzip")
.body(compressed_body);
});
let client = test_client();
let url = format!("{}/gzip", server.base_url());
let body = client
.get(&url)
.send()
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(
body.as_ref(),
original_body,
"Decompressed body should match original"
);
}
#[tokio::test]
async fn test_gzip_decompression_json() {
#[derive(serde::Deserialize, PartialEq, Debug)]
struct TestData {
name: String,
value: i32,
nested: NestedData,
}
#[derive(serde::Deserialize, PartialEq, Debug)]
struct NestedData {
items: Vec<String>,
}
let server = MockServer::start();
let json_body = r#"{"name":"test","value":42,"nested":{"items":["a","b","c"]}}"#;
let compressed_body = gzip_compress(json_body.as_bytes());
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/gzip-json");
then.status(200)
.header("content-type", "application/json")
.header("content-encoding", "gzip")
.body(compressed_body);
});
let client = test_client();
let url = format!("{}/gzip-json", server.base_url());
let data: TestData = client.get(&url).send().await.unwrap().json().await.unwrap();
assert_eq!(data.name, "test");
assert_eq!(data.value, 42);
assert_eq!(data.nested.items, vec!["a", "b", "c"]);
}
#[tokio::test]
async fn test_gzip_decompression_body_size_limit() {
let server = MockServer::start();
let large_decompressed = vec![b'x'; 100 * 1024]; let compressed_body = gzip_compress(&large_decompressed);
assert!(
compressed_body.len() < 2000,
"Compressed body should be small (got {} bytes)",
compressed_body.len()
);
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/gzip-bomb");
then.status(200)
.header("content-encoding", "gzip")
.body(compressed_body);
});
let client = HttpClientBuilder::new()
.retry(None)
.max_body_size(10 * 1024) .build()
.unwrap();
let url = format!("{}/gzip-bomb", server.base_url());
let result = client.get(&url).send().await.unwrap().bytes().await;
match result {
Err(HttpError::BodyTooLarge { limit, actual }) => {
assert_eq!(limit, 10 * 1024, "Limit should be 10KB");
assert!(
actual > limit,
"Actual size ({actual}) should exceed limit ({limit})"
);
}
Err(other) => panic!("Expected BodyTooLarge error, got: {other:?}"),
Ok(body) => panic!(
"Expected BodyTooLarge error, but got {} bytes of body",
body.len()
),
}
}
#[tokio::test]
async fn test_accept_encoding_header_sent() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET)
.path("/check-accept-encoding")
.header_exists("accept-encoding");
then.status(200).body("ok");
});
let client = test_client();
let url = format!("{}/check-accept-encoding", server.base_url());
let resp = client.get(&url).send().await.unwrap();
assert_eq!(resp.status(), hyper::StatusCode::OK);
assert_eq!(
mock.calls(),
1,
"Request should have included Accept-Encoding header"
);
}
#[tokio::test]
async fn test_no_compression_passthrough() {
let server = MockServer::start();
let plain_body = b"This is plain text, not compressed";
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/plain");
then.status(200)
.header("content-type", "text/plain")
.body(plain_body.as_slice());
});
let client = test_client();
let url = format!("{}/plain", server.base_url());
let body = client
.get(&url)
.send()
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(
body.as_ref(),
plain_body,
"Plain body should pass through unchanged"
);
}
#[tokio::test]
async fn test_gzip_decompression_checked_bytes() {
let server = MockServer::start();
let original_body = b"Checked bytes test with gzip";
let compressed_body = gzip_compress(original_body);
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/gzip-checked");
then.status(200)
.header("content-encoding", "gzip")
.body(compressed_body);
});
let client = test_client();
let url = format!("{}/gzip-checked", server.base_url());
let body = client
.get(&url)
.send()
.await
.unwrap()
.checked_bytes()
.await
.unwrap();
assert_eq!(
body.as_ref(),
original_body,
"checked_bytes should return decompressed content"
);
}
#[tokio::test]
async fn test_gzip_decompression_text() {
let server = MockServer::start();
let original_text = "Hello, World! \u{1F600}"; let compressed_body = gzip_compress(original_text.as_bytes());
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/gzip-text");
then.status(200)
.header("content-type", "text/plain; charset=utf-8")
.header("content-encoding", "gzip")
.body(compressed_body);
});
let client = test_client();
let url = format!("{}/gzip-text", server.base_url());
let text = client.get(&url).send().await.unwrap().text().await.unwrap();
assert_eq!(
text, original_text,
"text() should return decompressed UTF-8 content"
);
}
#[test]
fn test_map_buffer_error_passes_through_http_error() {
let http_err = HttpError::Timeout(std::time::Duration::from_secs(10));
let boxed: tower::BoxError = Box::new(http_err);
let result = map_buffer_error(boxed);
assert!(
matches!(result, HttpError::Timeout(_)),
"Should pass through HttpError::Timeout, got: {result:?}"
);
}
#[test]
fn test_map_buffer_error_returns_service_closed_for_unknown_error() {
let other_err: tower::BoxError = Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"buffer worker died",
));
let result = map_buffer_error(other_err);
assert!(
matches!(result, HttpError::ServiceClosed),
"Should return ServiceClosed for non-HttpError, got: {result:?}"
);
}
#[tokio::test]
async fn test_status_retry_get_500_retried() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET).path("/retry-500");
then.status(500).body("server error");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 2, backoff: ExponentialBackoff::fast(),
..RetryConfig::default() }),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/retry-500", server.base_url());
let result = client.get(&url).send().await;
assert_eq!(
mock.calls(),
3,
"GET should retry on 500; expected 3 calls (1 + 2 retries), got {}",
mock.calls()
);
let response = result.expect("send() should return Ok(Response) after retries exhaust");
assert_eq!(response.status(), hyper::StatusCode::INTERNAL_SERVER_ERROR);
let err = response.error_for_status().unwrap_err();
assert!(
matches!(err, HttpError::HttpStatus { status, .. } if status == hyper::StatusCode::INTERNAL_SERVER_ERROR)
);
}
#[tokio::test]
async fn test_status_retry_post_500_not_retried() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::POST).path("/post-500");
then.status(500).body("server error");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 3,
backoff: ExponentialBackoff::fast(),
..RetryConfig::default() }),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/post-500", server.base_url());
let result = client.post(&url).send().await;
assert_eq!(
mock.calls(),
1,
"POST should not be retried on 500; expected 1 call, got {}",
mock.calls()
);
let response = result.expect("POST + 500 should return Ok(Response), not Err");
assert_eq!(
response.status(),
hyper::StatusCode::INTERNAL_SERVER_ERROR,
"Response should have 500 status"
);
}
#[tokio::test]
async fn test_status_retry_post_429_retried() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::POST).path("/post-429");
then.status(429).body("rate limited");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 2, backoff: ExponentialBackoff::fast(),
..RetryConfig::default() }),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/post-429", server.base_url());
let result = client.post(&url).send().await;
assert_eq!(
mock.calls(),
3,
"POST should retry on 429; expected 3 calls (1 + 2 retries), got {}",
mock.calls()
);
let response = result.expect("send() should return Ok(Response) after retries exhaust");
assert_eq!(response.status(), hyper::StatusCode::TOO_MANY_REQUESTS);
let err = response.error_for_status().unwrap_err();
assert!(
matches!(err, HttpError::HttpStatus { status, .. } if status == hyper::StatusCode::TOO_MANY_REQUESTS)
);
}
#[tokio::test]
async fn test_status_retry_extracts_retry_after_header() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let _mock = server.mock(|when, then| {
when.method(Method::GET).path("/retry-after");
then.status(429)
.header("Retry-After", "60")
.header("Content-Type", "application/json")
.body(r#"{"error": "rate limited"}"#);
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 0, backoff: ExponentialBackoff::fast(),
..RetryConfig::default()
}),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/retry-after", server.base_url());
let result = client.get(&url).send().await;
let response = result.expect("send() should return Ok(Response)");
assert_eq!(response.status(), hyper::StatusCode::TOO_MANY_REQUESTS);
match response.error_for_status() {
Err(HttpError::HttpStatus {
status,
retry_after,
content_type,
..
}) => {
assert_eq!(status, hyper::StatusCode::TOO_MANY_REQUESTS);
assert_eq!(
retry_after,
Some(std::time::Duration::from_mins(1)),
"Should extract Retry-After header"
);
assert_eq!(
content_type,
Some("application/json".to_owned()),
"Should extract Content-Type header"
);
}
other => panic!("Expected HttpStatus error from error_for_status(), got: {other:?}"),
}
}
#[tokio::test]
async fn test_status_retry_ignores_retry_after_when_configured() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET).path("/ignore-retry-after");
then.status(429)
.header("Retry-After", "10") .body("rate limited");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 2,
backoff: ExponentialBackoff::fast(), ignore_retry_after: true, ..RetryConfig::default()
}),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/ignore-retry-after", server.base_url());
let start = std::time::Instant::now();
let _result = client.get(&url).send().await;
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(2),
"Should have used fast backoff, not 10s Retry-After; elapsed: {elapsed:?}"
);
assert_eq!(mock.calls(), 3, "Expected 3 calls, got {}", mock.calls());
}
#[tokio::test]
async fn test_non_retryable_status_passes_through() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET).path("/not-found");
then.status(404)
.header("content-type", "application/json")
.body(r#"{"error": "not found"}"#);
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 3,
backoff: ExponentialBackoff::fast(),
..RetryConfig::default()
}),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/not-found", server.base_url());
let result = client.get(&url).send().await;
assert_eq!(
mock.calls(),
1,
"404 should not trigger retry; expected 1 call, got {}",
mock.calls()
);
let response = result.expect("send() should succeed for 404");
assert_eq!(response.status(), hyper::StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_status_retry_exhausted_returns_ok_response() {
use crate::config::{ExponentialBackoff, HttpClientConfig, RetryConfig};
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET).path("/always-500");
then.status(500).body("always fails");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: Some(RetryConfig {
max_retries: 2, backoff: ExponentialBackoff::fast(),
..RetryConfig::default()
}),
rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/always-500", server.base_url());
let result = client.get(&url).send().await;
assert_eq!(
mock.calls(),
3,
"Expected 3 calls (1 initial + 2 retries), got {}",
mock.calls()
);
let response = result.expect("send() should return Ok(Response) after retries exhaust");
assert_eq!(response.status(), hyper::StatusCode::INTERNAL_SERVER_ERROR);
let err = response.error_for_status().unwrap_err();
assert!(
matches!(err, HttpError::HttpStatus { status, .. } if status == hyper::StatusCode::INTERNAL_SERVER_ERROR)
);
}
#[tokio::test]
async fn test_no_retry_config_status_passes_through() {
use crate::config::HttpClientConfig;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(Method::GET).path("/no-retry");
then.status(500).body("server error");
});
let config = HttpClientConfig {
transport: crate::config::TransportSecurity::AllowInsecureHttp,
retry: None, rate_limit: None,
..Default::default()
};
let client = HttpClientBuilder::with_config(config).build().unwrap();
let url = format!("{}/no-retry", server.base_url());
let result = client.get(&url).send().await;
assert_eq!(mock.calls(), 1);
let response = result.expect("send() should succeed when retry disabled");
assert_eq!(response.status(), hyper::StatusCode::INTERNAL_SERVER_ERROR);
let err = response.error_for_status().unwrap_err();
assert!(
matches!(err, HttpError::HttpStatus { status, .. } if status == hyper::StatusCode::INTERNAL_SERVER_ERROR)
);
}
#[tokio::test]
async fn test_url_scheme_http_rejected_with_tls_only() {
let client = HttpClientBuilder::new()
.transport(crate::config::TransportSecurity::TlsOnly)
.retry(None)
.build()
.unwrap();
let result = client.get("http://example.com/test").send().await;
match result {
Err(HttpError::InvalidScheme { scheme, reason }) => {
assert_eq!(scheme, "http");
assert!(
reason.contains("TlsOnly"),
"Error should mention TlsOnly: {reason}"
);
}
Err(other) => panic!("Expected InvalidScheme error, got: {other:?}"),
Ok(_) => panic!("Expected InvalidScheme error, but request succeeded"),
}
}
#[tokio::test]
async fn test_url_scheme_http_allowed_with_allow_insecure() {
let server = MockServer::start();
let _m = server.mock(|when, then| {
when.method(Method::GET).path("/test");
then.status(200).body("ok");
});
let client = HttpClientBuilder::new()
.transport(crate::config::TransportSecurity::AllowInsecureHttp)
.retry(None)
.build()
.unwrap();
let url = format!("{}/test", server.base_url()); let result = client.get(&url).send().await;
assert!(result.is_ok(), "http:// should be allowed: {result:?}");
}
#[tokio::test]
async fn test_url_scheme_https_always_allowed() {
let client = HttpClientBuilder::new()
.transport(crate::config::TransportSecurity::TlsOnly)
.retry(None)
.build()
.unwrap();
let result = client.get("https://localhost:0/test").send().await;
if let Err(HttpError::InvalidScheme { .. }) = result {
panic!("https:// should not trigger InvalidScheme error")
}
}
#[tokio::test]
async fn test_url_scheme_invalid_rejected() {
let client = HttpClientBuilder::new().retry(None).build().unwrap();
let result = client.get("ftp://files.example.com/file.txt").send().await;
match result {
Err(HttpError::InvalidScheme { scheme, reason }) => {
assert_eq!(scheme, "ftp");
assert!(
reason.contains("http://") || reason.contains("https://"),
"Error should mention supported schemes: {reason}"
);
}
Err(other) => panic!("Expected InvalidScheme error, got: {other:?}"),
Ok(_) => panic!("Expected InvalidScheme error, but request succeeded"),
}
}
#[tokio::test]
async fn test_url_scheme_missing_rejected() {
let client = HttpClientBuilder::new().retry(None).build().unwrap();
let result = client.get("example.com/test").send().await;
match result {
Err(HttpError::InvalidUri { url, reason, kind }) => {
assert_eq!(url, "example.com/test");
assert!(!reason.is_empty(), "Should have a reason for invalid URI");
assert_eq!(kind, crate::error::InvalidUriKind::ParseError);
}
Err(other) => panic!("Expected InvalidUri error, got: {other:?}"),
Ok(_) => panic!("Expected InvalidUri error, but request succeeded"),
}
}
}