toolkit_http/builder.rs
1use crate::config::{
2 ClientAuthConfig, HttpClientConfig, RedirectConfig, RetryConfig, TlsConfig, TlsRootConfig,
3 TlsVersion, TransportSecurity,
4};
5use crate::error::HttpError;
6use crate::layers::{OtelLayer, RetryLayer, SecureRedirectPolicy, UserAgentLayer};
7use crate::response::ResponseBody;
8use crate::tls;
9use bytes::Bytes;
10use http::Response;
11use http_body_util::{BodyExt, Full};
12use hyper_rustls::HttpsConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::client::legacy::connect::HttpConnector;
15use hyper_util::rt::{TokioExecutor, TokioTimer};
16use std::time::Duration;
17use tower::buffer::Buffer;
18use tower::limit::ConcurrencyLimitLayer;
19use tower::load_shed::LoadShedLayer;
20use tower::timeout::TimeoutLayer;
21use tower::util::BoxCloneService;
22use tower::{ServiceBuilder, ServiceExt};
23use tower_http::decompression::DecompressionLayer;
24use tower_http::follow_redirect::FollowRedirectLayer;
25
26/// Type-erased inner service between layer composition steps in [`HttpClientBuilder::build`].
27type InnerService =
28 BoxCloneService<http::Request<Full<Bytes>>, http::Response<ResponseBody>, HttpError>;
29
30/// Builder for constructing an [`HttpClient`] with a layered tower middleware stack.
31pub struct HttpClientBuilder {
32 config: HttpClientConfig,
33 auth_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
34 metrics_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
35}
36
37impl HttpClientBuilder {
38 /// Create a new builder with default configuration
39 #[must_use]
40 pub fn new() -> Self {
41 Self {
42 config: HttpClientConfig::default(),
43 auth_layer: None,
44 metrics_layer: None,
45 }
46 }
47
48 /// Create a builder with a specific configuration
49 #[must_use]
50 pub fn with_config(config: HttpClientConfig) -> Self {
51 Self {
52 config,
53 auth_layer: None,
54 metrics_layer: None,
55 }
56 }
57
58 /// Set the per-request timeout
59 ///
60 /// This timeout applies to each individual HTTP request/attempt.
61 /// If retries are enabled, each retry attempt gets its own timeout.
62 #[must_use]
63 pub fn timeout(mut self, timeout: Duration) -> Self {
64 self.config.request_timeout = timeout;
65 self
66 }
67
68 /// Set the total timeout spanning all retry attempts
69 ///
70 /// When set, the entire operation (including all retries and backoff delays)
71 /// must complete within this duration. If the deadline is exceeded,
72 /// the request fails with `HttpError::DeadlineExceeded(total_timeout)`.
73 #[must_use]
74 pub fn total_timeout(mut self, timeout: Duration) -> Self {
75 self.config.total_timeout = Some(timeout);
76 self
77 }
78
79 /// Set the user agent string
80 #[must_use]
81 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
82 self.config.user_agent = user_agent.into();
83 self
84 }
85
86 /// Set the retry configuration
87 #[must_use]
88 pub fn retry(mut self, retry: Option<RetryConfig>) -> Self {
89 self.config.retry = retry;
90 self
91 }
92
93 /// Set the maximum response body size
94 #[must_use]
95 pub fn max_body_size(mut self, size: usize) -> Self {
96 self.config.max_body_size = size;
97 self
98 }
99
100 /// Set transport security mode
101 ///
102 /// Use `TransportSecurity::TlsOnly` to enforce HTTPS for all connections.
103 #[must_use]
104 pub fn transport(mut self, transport: TransportSecurity) -> Self {
105 self.config.transport = transport;
106 self
107 }
108
109 /// Deny insecure HTTP connections, enforcing TLS for all traffic
110 ///
111 /// Equivalent to `.transport(TransportSecurity::TlsOnly)`.
112 ///
113 /// Use this when TLS enforcement is required (e.g., production environments).
114 #[must_use]
115 pub fn deny_insecure_http(mut self) -> Self {
116 tracing::debug!(
117 target: "toolkit_http::security",
118 "deny_insecure_http() called - enforcing TLS for all connections"
119 );
120 self.config.transport = TransportSecurity::TlsOnly;
121 self
122 }
123
124 /// Set the TLS handshake configuration (minimum protocol version and
125 /// optional mutual-TLS client identity).
126 ///
127 /// Replaces the entire [`TlsConfig`]; use [`crate::config::TlsConfig`]'s
128 /// fields to set `min_version` and `client_auth`. The root-trust strategy
129 /// is configured separately via the `tls_roots` config field.
130 ///
131 /// Mutual-TLS PEM material referenced by `client_auth` is read and parsed
132 /// in [`HttpClientBuilder::build`]; IO/parse failures surface there as
133 /// [`HttpError::Tls`].
134 ///
135 /// [`HttpClientBuilder::build`]: crate::builder::HttpClientBuilder::build
136 #[must_use]
137 pub fn tls(mut self, tls: TlsConfig) -> Self {
138 self.config.tls = tls;
139 self
140 }
141
142 /// Set the minimum TLS protocol version, leaving the rest of the
143 /// [`TlsConfig`] (e.g. `client_auth`) untouched.
144 ///
145 /// Convenience shortcut for mutating `config.tls.min_version` without
146 /// rebuilding the whole [`TlsConfig`] via [`HttpClientBuilder::tls`].
147 ///
148 /// [`HttpClientBuilder::tls`]: crate::builder::HttpClientBuilder::tls
149 #[must_use]
150 pub fn tls_min_version(mut self, min_version: TlsVersion) -> Self {
151 self.config.tls.min_version = min_version;
152 self
153 }
154
155 /// Set the mutual-TLS client identity, leaving the rest of the
156 /// [`TlsConfig`] (e.g. `min_version`) untouched.
157 ///
158 /// Convenience shortcut for mutating `config.tls.client_auth` without
159 /// rebuilding the whole [`TlsConfig`] via [`HttpClientBuilder::tls`]. The
160 /// referenced PEM material is read and parsed in
161 /// [`HttpClientBuilder::build`]; IO/parse failures surface there as
162 /// [`HttpError::Tls`].
163 ///
164 /// [`HttpClientBuilder::tls`]: crate::builder::HttpClientBuilder::tls
165 /// [`HttpClientBuilder::build`]: crate::builder::HttpClientBuilder::build
166 #[must_use]
167 pub fn client_auth(mut self, client_auth: ClientAuthConfig) -> Self {
168 self.config.tls.client_auth = Some(client_auth);
169 self
170 }
171
172 /// Enable OpenTelemetry tracing layer
173 ///
174 /// When enabled, creates spans for outbound requests with HTTP metadata
175 /// and injects W3C trace context headers (when `otel` feature is enabled).
176 #[must_use]
177 pub fn with_otel(mut self) -> Self {
178 self.config.otel = true;
179 self
180 }
181
182 /// Insert an optional auth layer between retry and timeout in the stack.
183 ///
184 /// Stack position: `… → Retry → **this layer** → Timeout → …`
185 ///
186 /// The layer sits inside the retry loop so each attempt re-executes it
187 /// (e.g. re-reads a refreshed bearer token). Only one auth layer can be
188 /// set; a second call replaces the first.
189 #[must_use]
190 pub fn with_auth_layer(
191 mut self,
192 wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
193 ) -> Self {
194 self.auth_layer = Some(Box::new(wrap));
195 self
196 }
197
198 /// Insert a metrics layer between the rate-limit and retry layers.
199 ///
200 /// Stack position: `… → RateLimit → **this layer** → Retry → Auth → Timeout → …`
201 ///
202 /// The layer sits outside the retry loop so it observes a single logical
203 /// request once, regardless of how many transport-level retries the retry
204 /// layer issues. If the use case is "count every attempt", the equivalent
205 /// observation can be made with [`with_otel`](Self::with_otel) (one span
206 /// per attempt) and a `tracing` → metrics bridge.
207 ///
208 /// Only one metrics layer can be set; a second call replaces the first.
209 #[must_use]
210 pub fn with_metrics_layer(
211 mut self,
212 wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
213 ) -> Self {
214 self.metrics_layer = Some(Box::new(wrap));
215 self
216 }
217
218 /// Record OpenTelemetry client metrics using the default route classifier.
219 ///
220 /// Emits `http.client.request.duration` (a histogram, in seconds) for each
221 /// logical request. `client_type` names the instrumentation scope (the
222 /// meter), like the server-side gear name. The default classifier labels
223 /// each request `"METHOD host"` via the `http.route` attribute — a bounded
224 /// value that never contains a raw path, so metric cardinality stays
225 /// controlled. Use [`with_metrics_by`](Self::with_metrics_by)
226 /// to supply route templates.
227 ///
228 /// Like [`with_metrics_layer`](Self::with_metrics_layer), the layer sits
229 /// outside the retry loop, so it observes one logical request, not each
230 /// attempt.
231 #[cfg(feature = "otel")]
232 #[must_use]
233 pub fn with_metrics(self, client_type: impl Into<String>) -> Self {
234 self.with_metrics_by(client_type, crate::layers::default_classify)
235 }
236
237 /// Record OpenTelemetry client metrics with a custom route classifier.
238 ///
239 /// Like [`with_metrics`](Self::with_metrics) but accepts a classifier — `classify` produces the
240 /// `http.route` attribute for each request (the Rust analogue of go-appkit's
241 /// `ClassifyRequest`). It MUST return a bounded set of values (e.g.
242 /// `"GET /users/{id}"`), never raw paths, to avoid unbounded metric
243 /// cardinality.
244 #[cfg(feature = "otel")]
245 #[must_use]
246 pub fn with_metrics_by(
247 self,
248 client_type: impl Into<String>,
249 classify: impl Fn(&http::Request<Full<Bytes>>) -> std::borrow::Cow<'static, str>
250 + Send
251 + Sync
252 + 'static,
253 ) -> Self {
254 let client_type = client_type.into();
255 let classify: crate::layers::ClassifyFn = std::sync::Arc::new(classify);
256 let layer = crate::layers::MetricsLayer::new(&client_type, classify);
257 self.with_metrics_layer(move |svc| {
258 ServiceBuilder::new()
259 .layer(layer)
260 .service(svc)
261 .boxed_clone()
262 })
263 }
264
265 /// Set the buffer capacity for concurrent request handling
266 ///
267 /// The HTTP client uses an internal buffer to allow concurrent requests
268 /// without external locking. This sets the maximum number of requests
269 /// that can be queued.
270 ///
271 /// **Note**: A capacity of 0 is invalid and will be clamped to 1.
272 /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
273 #[must_use]
274 pub fn buffer_capacity(mut self, capacity: usize) -> Self {
275 // Clamp to at least 1 - tower::Buffer panics with capacity=0
276 self.config.buffer_capacity = capacity.max(1);
277 self
278 }
279
280 /// Set the maximum number of redirects to follow
281 ///
282 /// Set to `0` to disable redirect following (3xx responses pass through as-is).
283 /// Default: 10
284 #[must_use]
285 pub fn max_redirects(mut self, max_redirects: usize) -> Self {
286 self.config.redirect.max_redirects = max_redirects;
287 self
288 }
289
290 /// Disable redirect following
291 ///
292 /// Equivalent to `.max_redirects(0)`. When disabled, 3xx responses are
293 /// returned to the caller without following the `Location` header.
294 #[must_use]
295 pub fn no_redirects(mut self) -> Self {
296 self.config.redirect = RedirectConfig::disabled();
297 self
298 }
299
300 /// Set the redirect policy configuration
301 ///
302 /// Use this to configure redirect security settings:
303 /// - `same_origin_only`: Only follow redirects to the same host
304 /// - `strip_sensitive_headers`: Remove `Authorization`/`Cookie` on cross-origin
305 /// - `allow_https_downgrade`: Allow HTTPS → HTTP redirects (not recommended)
306 ///
307 /// # Example
308 ///
309 /// ```rust,ignore
310 /// let client = HttpClient::builder()
311 /// .redirect(RedirectConfig::permissive()) // Allow all redirects with header stripping
312 /// .build()?;
313 /// ```
314 #[must_use]
315 pub fn redirect(mut self, config: RedirectConfig) -> Self {
316 self.config.redirect = config;
317 self
318 }
319
320 /// Set the idle connection timeout for the connection pool
321 ///
322 /// Connections that remain idle for longer than this duration will be
323 /// closed and removed from the pool. Default: 90 seconds.
324 ///
325 /// Set to `None` to disable idle timeout (connections kept indefinitely).
326 #[must_use]
327 pub fn pool_idle_timeout(mut self, timeout: Option<Duration>) -> Self {
328 self.config.pool_idle_timeout = timeout;
329 self
330 }
331
332 /// Set the maximum number of idle connections per host
333 ///
334 /// Limits how many idle connections are kept in the pool for each host.
335 /// Default: 32.
336 ///
337 /// - Setting to `0` disables connection reuse entirely
338 /// - Setting too high may waste resources on rarely-used connections
339 #[must_use]
340 pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
341 self.config.pool_max_idle_per_host = max;
342 self
343 }
344
345 /// Build the HTTP client with all configured layers
346 ///
347 /// # Errors
348 /// Returns an error if TLS initialization fails or configuration is invalid.
349 ///
350 /// Under `--features fips`, returns [`HttpError::InsecureTransport`] when
351 /// `config.transport == TransportSecurity::AllowInsecureHttp`. FIPS builds
352 /// must use [`TransportSecurity::TlsOnly`] — there is no opt-out.
353 pub fn build(self) -> Result<crate::HttpClient, HttpError> {
354 // Reject AllowInsecureHttp under --features fips before any TLS work.
355 // The check lives here (rather than in HttpClientConfig) so that
356 // constructing a config with AllowInsecureHttp is still cheap and
357 // infallible; only actually building a client fails closed.
358 #[cfg(feature = "fips")]
359 if self.config.transport == TransportSecurity::AllowInsecureHttp {
360 tracing::warn!(
361 target: "toolkit_http::security",
362 "rejecting AllowInsecureHttp under --features fips: returning HttpError::InsecureTransport"
363 );
364 return Err(HttpError::InsecureTransport);
365 }
366
367 let timeout = self.config.request_timeout;
368 let total_timeout = self.config.total_timeout;
369
370 // Build the HTTPS connector (may fail for Native roots if no valid certs)
371 let https = build_https_connector(
372 self.config.tls_roots,
373 self.config.transport,
374 &self.config.tls,
375 )?;
376
377 // Create the base hyper client with HTTP/2 support and connection pool settings
378 let mut client_builder = Client::builder(TokioExecutor::new());
379
380 // Configure connection pool
381 // CRITICAL: pool_timer is required for pool_idle_timeout to work!
382 client_builder
383 .pool_timer(TokioTimer::new())
384 .pool_max_idle_per_host(self.config.pool_max_idle_per_host)
385 .http2_only(false); // Allow both HTTP/1 and HTTP/2 via ALPN
386
387 // Set idle timeout (None = no timeout, connections kept indefinitely)
388 if let Some(idle_timeout) = self.config.pool_idle_timeout {
389 client_builder.pool_idle_timeout(idle_timeout);
390 }
391
392 let hyper_client = client_builder.build::<_, Full<Bytes>>(https);
393
394 // Parse user agent header (may fail)
395 let ua_layer = UserAgentLayer::try_new(&self.config.user_agent)?;
396
397 // =======================================================================
398 // Tower Layer Stack (outer to inner)
399 // =======================================================================
400 //
401 // Request flow (outer → inner):
402 // Buffer → OtelLayer → LoadShed/Concurrency → [MetricsLayer?] →
403 // RetryLayer → [AuthLayer?] → ErrorMapping → Timeout → UserAgent →
404 // Decompression → FollowRedirect → hyper_client
405 //
406 // AuthLayer (if set via with_auth_layer) sits inside the retry
407 // loop so each retry re-acquires credentials (e.g. refreshed
408 // bearer token).
409 //
410 // MetricsLayer (if set via with_metrics_layer) sits outside the
411 // retry loop so it observes one logical request, not per-attempt.
412 //
413 // Response flow (inner → outer):
414 // hyper_client → FollowRedirect → Decompression → UserAgent →
415 // Timeout → ErrorMapping → [AuthLayer?] → RetryLayer →
416 // [MetricsLayer?] → LoadShed/Concurrency → OtelLayer → Buffer
417 //
418 // Key semantics (reqwest-like):
419 // - send() returns Ok(Response) for ALL HTTP statuses (including 4xx/5xx)
420 // - send() returns Err only for transport/timeout/TLS errors
421 // - Non-2xx converted to error ONLY via error_for_status()
422 // - RetryLayer handles both Err (transport) and Ok(Response) (status)
423 // retries internally, draining body before retry for connection reuse
424 // - FollowRedirect handles 3xx responses internally with security protections:
425 // * Same-origin enforcement (default) - blocks SSRF attacks
426 // * Sensitive header stripping on cross-origin redirects
427 // * HTTPS downgrade protection
428 //
429 // =======================================================================
430 //
431 let redirect_policy = SecureRedirectPolicy::new(self.config.redirect.clone());
432
433 // Build the service stack with secure redirect following
434 let service = ServiceBuilder::new()
435 .layer(TimeoutLayer::new(timeout))
436 .layer(ua_layer)
437 .layer(DecompressionLayer::new())
438 .layer(FollowRedirectLayer::with_policy(redirect_policy))
439 .service(hyper_client);
440
441 // Map the decompression body to our boxed ResponseBody type.
442 // This converts Response<DecompressionBody<Incoming>> to Response<ResponseBody>.
443 //
444 // The decompression body's error type is tower_http::BoxError, which we convert
445 // to our boxed error type for consistency with the ResponseBody definition.
446 let service = service.map_response(map_decompression_response);
447
448 // Map errors to HttpError with proper timeout duration
449 let service = service.map_err(move |e: tower::BoxError| map_tower_error(e, timeout));
450
451 // Box the service for type erasure
452 let mut boxed_service = service.boxed_clone();
453
454 // Apply auth layer (between timeout and retry).
455 // Inside retry so each retry attempt re-acquires the token.
456 if let Some(wrap) = self.auth_layer {
457 boxed_service = wrap(boxed_service);
458 }
459
460 // Conditionally wrap with RetryLayer
461 //
462 // RetryLayer handles retries for both:
463 // - Err(HttpError::Transport/Timeout) - transport-level failures
464 // - Ok(Response) with retryable status codes (429, 5xx for GET, etc.)
465 //
466 // When retrying on status codes, RetryLayer drains the response body
467 // (up to configured limit) to allow connection reuse.
468 //
469 // If total_timeout is set, the entire operation (including all retries)
470 // must complete within that duration.
471 if let Some(ref retry_config) = self.config.retry {
472 let retry_layer = RetryLayer::with_total_timeout(retry_config.clone(), total_timeout);
473 let retry_service = ServiceBuilder::new()
474 .layer(retry_layer)
475 .service(boxed_service);
476 boxed_service = retry_service.boxed_clone();
477 }
478
479 // Apply metrics layer (between retry and rate-limit).
480 // Outside the retry loop: observes one logical request, not per-attempt.
481 if let Some(wrap) = self.metrics_layer {
482 boxed_service = wrap(boxed_service);
483 }
484
485 // Conditionally wrap with concurrency limit + load shedding
486 // LoadShedLayer returns error immediately when ConcurrencyLimitLayer is saturated
487 // instead of waiting indefinitely (Poll::Pending)
488 if let Some(rate_limit) = self.config.rate_limit
489 && rate_limit.max_concurrent_requests < usize::MAX
490 {
491 let limited_service = ServiceBuilder::new()
492 .layer(LoadShedLayer::new())
493 .layer(ConcurrencyLimitLayer::new(
494 rate_limit.max_concurrent_requests,
495 ))
496 .service(boxed_service);
497 // Map load shed errors to HttpError::Overloaded
498 let limited_service = limited_service.map_err(map_load_shed_error);
499 boxed_service = limited_service.boxed_clone();
500 }
501
502 // Conditionally wrap with OTEL tracing layer (outermost layer before buffer)
503 // Applied last so it sees the final request after UserAgent and other modifications.
504 // Creates spans, records status, and injects trace context headers.
505 if self.config.otel {
506 let otel_service = ServiceBuilder::new()
507 .layer(OtelLayer::new())
508 .service(boxed_service);
509 boxed_service = otel_service.boxed_clone();
510 }
511
512 // Wrap in Buffer as the final step for true concurrent access
513 // Buffer spawns a background task that processes requests from a channel,
514 // providing Clone + Send + Sync without any mutex serialization.
515 let buffer_capacity = self.config.buffer_capacity.max(1);
516 let buffered_service: crate::client::BufferedService =
517 Buffer::new(boxed_service, buffer_capacity);
518
519 Ok(crate::HttpClient {
520 service: buffered_service,
521 max_body_size: self.config.max_body_size,
522 transport_security: self.config.transport,
523 })
524 }
525}
526
527#[cfg(test)]
528impl HttpClientBuilder {
529 /// Build an `HttpClient` with a custom inner service replacing the
530 /// hyper connector. The full middleware stack (Retry, Concurrency,
531 /// Buffer, etc.) is applied on top.
532 ///
533 /// The inner service must handle `Request<Full<Bytes>>` and return
534 /// `Response<ResponseBody>`. Use this to inject a fake slow service
535 /// for cancellation testing without needing a real HTTP server.
536 fn build_with_inner_service(self, inner: InnerService) -> crate::HttpClient {
537 let mut boxed_service = inner;
538
539 if let Some(ref retry_config) = self.config.retry {
540 let retry_layer =
541 RetryLayer::with_total_timeout(retry_config.clone(), self.config.total_timeout);
542 let retry_service = ServiceBuilder::new()
543 .layer(retry_layer)
544 .service(boxed_service);
545 boxed_service = retry_service.boxed_clone();
546 }
547
548 if let Some(rate_limit) = self.config.rate_limit
549 && rate_limit.max_concurrent_requests < usize::MAX
550 {
551 let limited_service = ServiceBuilder::new()
552 .layer(LoadShedLayer::new())
553 .layer(ConcurrencyLimitLayer::new(
554 rate_limit.max_concurrent_requests,
555 ))
556 .service(boxed_service);
557 let limited_service = limited_service.map_err(map_load_shed_error);
558 boxed_service = limited_service.boxed_clone();
559 }
560
561 let buffer_capacity = self.config.buffer_capacity.max(1);
562 let buffered_service: crate::client::BufferedService =
563 Buffer::new(boxed_service, buffer_capacity);
564
565 crate::HttpClient {
566 service: buffered_service,
567 max_body_size: self.config.max_body_size,
568 transport_security: self.config.transport,
569 }
570 }
571}
572
573impl Default for HttpClientBuilder {
574 fn default() -> Self {
575 Self::new()
576 }
577}
578
579/// Map tower errors to `HttpError` with actual timeout duration
580///
581/// Attempts to extract existing `HttpError` from the boxed error before
582/// wrapping as `Transport`. This preserves typed errors like `Overloaded`
583/// and `ServiceClosed` that may have been boxed by tower middleware.
584fn map_tower_error(err: tower::BoxError, timeout: Duration) -> HttpError {
585 if err.is::<tower::timeout::error::Elapsed>() {
586 return HttpError::Timeout(timeout);
587 }
588
589 // Try to extract existing HttpError before wrapping as Transport
590 match err.downcast::<HttpError>() {
591 Ok(http_err) => *http_err,
592 Err(other) => HttpError::Transport(other),
593 }
594}
595
596/// Map load shed errors to `HttpError::Overloaded`
597fn map_load_shed_error(err: tower::BoxError) -> HttpError {
598 if err.is::<tower::load_shed::error::Overloaded>() {
599 HttpError::Overloaded
600 } else {
601 // Pass through other HttpError types (from inner service)
602 match err.downcast::<HttpError>() {
603 Ok(http_err) => *http_err,
604 Err(err) => HttpError::Transport(err),
605 }
606 }
607}
608
609/// Map the decompression response to our boxed response body type.
610///
611/// This converts `Response<DecompressionBody<Incoming>>` to `Response<ResponseBody>`
612/// by boxing the body with appropriate error type mapping.
613fn map_decompression_response<B>(response: Response<B>) -> Response<ResponseBody>
614where
615 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
616 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
617{
618 let (parts, body) = response.into_parts();
619 // Convert the decompression body errors to our boxed error type.
620 // tower-http's DecompressionBody uses tower_http::BoxError which is
621 // compatible with our Box<dyn Error + Send + Sync> via Into.
622 let boxed_body: ResponseBody = body.map_err(Into::into).boxed();
623 Response::from_parts(parts, boxed_body)
624}
625
626/// Build the HTTPS connector with the specified TLS root configuration.
627///
628/// For `TlsRootConfig::Native`, uses cached native root certificates to avoid
629/// repeated OS certificate store lookups on each `build()` call.
630///
631/// HTTP/2 is enabled via `enable_all_versions()` which configures ALPN to
632/// advertise both h2 and http/1.1. Protocol selection happens during TLS
633/// handshake based on server support.
634///
635/// # Errors
636///
637/// Returns `HttpError::Tls` if `TlsRootConfig::Native` is requested but no
638/// valid root certificates are available from the OS certificate store.
639fn build_https_connector(
640 tls_roots: TlsRootConfig,
641 transport: TransportSecurity,
642 tls: &TlsConfig,
643) -> Result<HttpsConnector<HttpConnector>, HttpError> {
644 let allow_http = transport == TransportSecurity::AllowInsecureHttp;
645
646 // Both branches build a `ClientConfig` ourselves (rather than using
647 // `with_provider_and_webpki_roots`) so we can apply `require_ems = true`
648 // under the `fips` feature — see `tls::build_client_config`. The
649 // functions return `tls::TlsConfigError` (an enum implementing
650 // `std::error::Error`); boxing it into `HttpError::Tls`'s
651 // `Box<dyn Error + Send + Sync>` preserves the source chain via
652 // `TlsConfigError`'s own `Error::source()` impl.
653 let client_config = match tls_roots {
654 TlsRootConfig::WebPki => tls::webpki_roots_client_config(tls),
655 TlsRootConfig::Native => tls::native_roots_client_config(tls),
656 }
657 .map_err(|e| HttpError::Tls(Box::new(e)))?;
658
659 let builder = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(client_config);
660 let connector = if allow_http {
661 builder.https_or_http().enable_all_versions().build()
662 } else {
663 builder.https_only().enable_all_versions().build()
664 };
665 Ok(connector)
666}
667
668#[cfg(test)]
669#[cfg_attr(coverage_nightly, coverage(off))]
670mod tests {
671 use super::*;
672 use crate::config::DEFAULT_USER_AGENT;
673
674 #[test]
675 fn test_builder_default() {
676 let builder = HttpClientBuilder::new();
677 assert_eq!(builder.config.request_timeout, Duration::from_secs(30));
678 assert_eq!(builder.config.user_agent, DEFAULT_USER_AGENT);
679 assert!(builder.config.retry.is_some());
680 assert_eq!(builder.config.buffer_capacity, 1024);
681 }
682
683 #[test]
684 fn test_builder_with_config() {
685 let config = HttpClientConfig::minimal();
686 let builder = HttpClientBuilder::with_config(config);
687 assert_eq!(builder.config.request_timeout, Duration::from_secs(10));
688 }
689
690 #[test]
691 fn test_builder_timeout() {
692 let builder = HttpClientBuilder::new().timeout(Duration::from_mins(1));
693 assert_eq!(builder.config.request_timeout, Duration::from_mins(1));
694 }
695
696 #[test]
697 fn test_builder_user_agent() {
698 let builder = HttpClientBuilder::new().user_agent("custom/1.0");
699 assert_eq!(builder.config.user_agent, "custom/1.0");
700 }
701
702 #[test]
703 fn test_builder_retry() {
704 let builder = HttpClientBuilder::new().retry(None);
705 assert!(builder.config.retry.is_none());
706 }
707
708 #[test]
709 fn test_builder_max_body_size() {
710 let builder = HttpClientBuilder::new().max_body_size(1024);
711 assert_eq!(builder.config.max_body_size, 1024);
712 }
713
714 #[test]
715 fn test_builder_transport_security() {
716 let builder = HttpClientBuilder::new().transport(TransportSecurity::TlsOnly);
717 assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
718
719 let builder = HttpClientBuilder::new().deny_insecure_http();
720 assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
721
722 let builder = HttpClientBuilder::new();
723 #[cfg(not(feature = "fips"))]
724 assert_eq!(
725 builder.config.transport,
726 TransportSecurity::AllowInsecureHttp
727 );
728 #[cfg(feature = "fips")]
729 assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
730 }
731
732 #[test]
733 fn test_builder_otel() {
734 let builder = HttpClientBuilder::new().with_otel();
735 assert!(builder.config.otel);
736 }
737
738 #[test]
739 fn test_builder_buffer_capacity() {
740 let builder = HttpClientBuilder::new().buffer_capacity(512);
741 assert_eq!(builder.config.buffer_capacity, 512);
742 }
743
744 /// Test that `buffer_capacity=0` is clamped to 1 to prevent panic.
745 ///
746 /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
747 #[test]
748 fn test_builder_buffer_capacity_zero_clamped() {
749 let builder = HttpClientBuilder::new().buffer_capacity(0);
750 assert_eq!(
751 builder.config.buffer_capacity, 1,
752 "buffer_capacity=0 should be clamped to 1"
753 );
754 }
755
756 /// Test that `buffer_capacity=0` via config is clamped during `build()`.
757 #[tokio::test]
758 async fn test_builder_buffer_capacity_zero_in_config_clamped() {
759 let config = HttpClientConfig {
760 buffer_capacity: 0, // Invalid - should be clamped in build()
761 ..Default::default()
762 };
763 let result = HttpClientBuilder::with_config(config).build();
764 // Should succeed (clamped to 1), not panic
765 assert!(
766 result.is_ok(),
767 "build() should succeed with capacity clamped to 1"
768 );
769 }
770
771 #[tokio::test]
772 async fn test_builder_build_with_otel() {
773 let client = HttpClientBuilder::new().with_otel().build();
774 assert!(client.is_ok());
775 }
776
777 #[tokio::test]
778 async fn test_builder_with_auth_layer() {
779 let client = HttpClientBuilder::new()
780 .with_auth_layer(|svc| svc) // identity transform
781 .build();
782 assert!(client.is_ok());
783 }
784
785 #[tokio::test]
786 async fn test_builder_with_metrics_layer() {
787 let client = HttpClientBuilder::new()
788 .with_metrics_layer(|svc| svc) // identity transform
789 .build();
790 assert!(client.is_ok());
791 }
792
793 #[tokio::test]
794 async fn test_builder_with_metrics_layer_second_call_replaces_first() {
795 use std::sync::Arc;
796 use std::sync::atomic::{AtomicUsize, Ordering};
797
798 let call_count = Arc::new(AtomicUsize::new(0));
799 let call_count2 = call_count.clone();
800
801 // Second call should replace the first; only one layer is applied.
802 let client = HttpClientBuilder::new()
803 .with_metrics_layer(|_svc| {
804 // This closure should NOT be called (replaced by the second).
805 panic!("first metrics layer should have been replaced");
806 })
807 .with_metrics_layer(move |svc| {
808 call_count2.fetch_add(1, Ordering::SeqCst);
809 svc
810 })
811 .build();
812
813 assert!(client.is_ok());
814 assert_eq!(
815 call_count.load(Ordering::SeqCst),
816 1,
817 "second metrics layer must be applied exactly once"
818 );
819 }
820
821 #[tokio::test]
822 async fn test_builder_build() {
823 let client = HttpClientBuilder::new().build();
824 assert!(client.is_ok());
825 }
826
827 #[tokio::test]
828 async fn test_builder_build_with_deny_insecure_http() {
829 let client = HttpClientBuilder::new().deny_insecure_http().build();
830 assert!(client.is_ok());
831 }
832
833 #[tokio::test]
834 async fn test_builder_build_with_sse_config() {
835 use crate::config::HttpClientConfig;
836 let config = HttpClientConfig::sse();
837 let client = HttpClientBuilder::with_config(config).build();
838 assert!(client.is_ok(), "SSE config should build successfully");
839 }
840
841 #[tokio::test]
842 async fn test_builder_build_invalid_user_agent() {
843 let client = HttpClientBuilder::new()
844 .user_agent("invalid\x00agent")
845 .build();
846 assert!(client.is_err());
847 }
848
849 #[tokio::test]
850 async fn test_builder_default_uses_webpki_roots() {
851 let builder = HttpClientBuilder::new();
852 assert_eq!(builder.config.tls_roots, TlsRootConfig::WebPki);
853 // Build should succeed without OS native roots
854 let client = builder.build();
855 assert!(client.is_ok());
856 }
857
858 #[tokio::test]
859 async fn test_builder_native_roots() {
860 let config = HttpClientConfig {
861 tls_roots: TlsRootConfig::Native,
862 ..Default::default()
863 };
864 let result = HttpClientBuilder::with_config(config).build();
865
866 // Native roots may succeed or fail depending on OS certificate availability.
867 // On systems with certs: Ok(_)
868 // On minimal containers without certs: Err(HttpError::Tls(_))
869 match &result {
870 Ok(_) => {
871 // Success on systems with native certs
872 }
873 Err(HttpError::Tls(err)) => {
874 // Expected failure on systems without native certs
875 let msg = err.to_string();
876 assert!(
877 msg.contains("native root") || msg.contains("certificate"),
878 "TLS error should mention certificates: {msg}"
879 );
880 }
881 Err(other) => {
882 panic!("Unexpected error type: {other:?}");
883 }
884 }
885 }
886
887 #[tokio::test]
888 async fn test_builder_webpki_roots_https_only() {
889 let config = HttpClientConfig {
890 tls_roots: TlsRootConfig::WebPki,
891 transport: TransportSecurity::TlsOnly,
892 ..Default::default()
893 };
894 let client = HttpClientBuilder::with_config(config).build();
895 assert!(client.is_ok());
896 }
897
898 /// Verify HTTP/2 is enabled for all TLS root configurations.
899 ///
900 /// HTTP/2 support is configured via `enable_all_versions()` on the connector,
901 /// which sets up ALPN to negotiate h2 or http/1.1 during TLS handshake.
902 /// The hyper client uses `http2_only(false)` to allow both protocols.
903 ///
904 /// The `AllowInsecureHttp` sub-cases are skipped under `--features fips`
905 /// because the FIPS guard in `build()` rejects insecure transport.
906 #[tokio::test]
907 async fn test_http2_enabled_for_all_configurations() {
908 // Test WebPki with default transport (AllowInsecureHttp without fips, TlsOnly under fips)
909 let client = HttpClientBuilder::new().build();
910 assert!(
911 client.is_ok(),
912 "WebPki + default transport should build with HTTP/2 enabled"
913 );
914
915 // Test WebPki with TlsOnly (HTTPS only)
916 let client = HttpClientBuilder::new()
917 .transport(TransportSecurity::TlsOnly)
918 .build();
919 assert!(
920 client.is_ok(),
921 "WebPki + TlsOnly should build with HTTP/2 enabled"
922 );
923
924 // Test Native roots with AllowInsecureHttp (non-fips only)
925 #[cfg(not(feature = "fips"))]
926 {
927 let config = HttpClientConfig {
928 tls_roots: TlsRootConfig::Native,
929 transport: TransportSecurity::AllowInsecureHttp,
930 ..Default::default()
931 };
932 let client = HttpClientBuilder::with_config(config).build();
933 assert!(
934 client.is_ok(),
935 "Native + AllowInsecureHttp should build with HTTP/2 enabled"
936 );
937 }
938
939 // Test Native roots with TlsOnly (HTTPS only)
940 let config = HttpClientConfig {
941 tls_roots: TlsRootConfig::Native,
942 transport: TransportSecurity::TlsOnly,
943 ..Default::default()
944 };
945 let client = HttpClientBuilder::with_config(config).build();
946 assert!(
947 client.is_ok(),
948 "Native + TlsOnly should build with HTTP/2 enabled"
949 );
950 }
951
952 /// Test that concurrency limit uses fail-fast behavior (C2).
953 ///
954 /// `LoadShedLayer` + `ConcurrencyLimitLayer` combination returns Overloaded error
955 /// immediately when capacity is exhausted, instead of blocking indefinitely.
956 #[tokio::test]
957 async fn test_load_shedding_returns_overloaded_error() {
958 use bytes::Bytes;
959 use http::{Request, Response};
960 use http_body_util::Full;
961 use std::future::Future;
962 use std::pin::Pin;
963 use std::sync::Arc;
964 use std::sync::atomic::{AtomicUsize, Ordering};
965 use std::task::{Context, Poll};
966 use tower::Service;
967 use tower::ServiceExt;
968
969 // A service that holds a slot forever once called
970 #[derive(Clone)]
971 struct SlotHoldingService {
972 active: Arc<AtomicUsize>,
973 }
974
975 impl Service<Request<Full<Bytes>>> for SlotHoldingService {
976 type Response = Response<Full<Bytes>>;
977 type Error = HttpError;
978 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
979
980 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
981 Poll::Ready(Ok(()))
982 }
983
984 fn call(&mut self, _: Request<Full<Bytes>>) -> Self::Future {
985 self.active.fetch_add(1, Ordering::SeqCst);
986 // Never complete - holds the slot
987 Box::pin(std::future::pending())
988 }
989 }
990
991 let active = Arc::new(AtomicUsize::new(0));
992
993 // Build a service with load shedding and concurrency limit of 1
994 let service = tower::ServiceBuilder::new()
995 .layer(LoadShedLayer::new())
996 .layer(ConcurrencyLimitLayer::new(1))
997 .service(SlotHoldingService {
998 active: active.clone(),
999 });
1000
1001 let service = service.map_err(map_load_shed_error);
1002
1003 // First request: will occupy the single slot
1004 let req1 = Request::builder()
1005 .uri("http://test")
1006 .body(Full::new(Bytes::new()))
1007 .unwrap();
1008 let mut svc1 = service.clone();
1009
1010 let svc1_ready = svc1.ready().await.unwrap();
1011 let _pending_fut = svc1_ready.call(req1);
1012
1013 // Wait for the slot to be occupied
1014 tokio::time::sleep(Duration::from_millis(10)).await;
1015 assert_eq!(
1016 active.load(Ordering::SeqCst),
1017 1,
1018 "First request should be active"
1019 );
1020
1021 // Second request: LoadShedLayer should reject because ConcurrencyLimit is at capacity
1022 let req2 = Request::builder()
1023 .uri("http://test")
1024 .body(Full::new(Bytes::new()))
1025 .unwrap();
1026
1027 let mut svc2 = service.clone();
1028
1029 // LoadShedLayer checks poll_ready and returns Overloaded if inner service is not ready
1030 let result = tokio::time::timeout(Duration::from_millis(100), async {
1031 // poll_ready should return quickly with error (not block)
1032 match svc2.ready().await {
1033 Ok(ready_svc) => ready_svc.call(req2).await,
1034 Err(e) => Err(e),
1035 }
1036 })
1037 .await;
1038
1039 // Should complete within timeout (not hang) and return Overloaded
1040 assert!(result.is_ok(), "Request should not hang");
1041 let err = result.unwrap().unwrap_err();
1042 assert!(
1043 matches!(err, HttpError::Overloaded),
1044 "Expected Overloaded error, got: {err:?}"
1045 );
1046 }
1047
1048 // ==========================================================================
1049 // map_tower_error Tests
1050 // ==========================================================================
1051
1052 /// Test that `map_tower_error` preserves `HttpError::Overloaded` when wrapped in `BoxError`
1053 #[test]
1054 fn test_map_tower_error_preserves_overloaded() {
1055 let http_err = HttpError::Overloaded;
1056 let boxed: tower::BoxError = Box::new(http_err);
1057 let result = map_tower_error(boxed, Duration::from_secs(30));
1058
1059 assert!(
1060 matches!(result, HttpError::Overloaded),
1061 "Should preserve HttpError::Overloaded, got: {result:?}"
1062 );
1063 }
1064
1065 /// Test that `map_tower_error` preserves `HttpError::ServiceClosed` when wrapped in `BoxError`
1066 #[test]
1067 fn test_map_tower_error_preserves_service_closed() {
1068 let http_err = HttpError::ServiceClosed;
1069 let boxed: tower::BoxError = Box::new(http_err);
1070 let result = map_tower_error(boxed, Duration::from_secs(30));
1071
1072 assert!(
1073 matches!(result, HttpError::ServiceClosed),
1074 "Should preserve HttpError::ServiceClosed, got: {result:?}"
1075 );
1076 }
1077
1078 /// Test that `map_tower_error` preserves `HttpError::Timeout` with original duration
1079 #[test]
1080 fn test_map_tower_error_preserves_timeout_attempt() {
1081 let original_duration = Duration::from_secs(5);
1082 let http_err = HttpError::Timeout(original_duration);
1083 let boxed: tower::BoxError = Box::new(http_err);
1084 // Pass a different timeout to verify original is preserved
1085 let result = map_tower_error(boxed, Duration::from_secs(30));
1086
1087 match result {
1088 HttpError::Timeout(d) => {
1089 assert_eq!(
1090 d, original_duration,
1091 "Should preserve original timeout duration"
1092 );
1093 }
1094 other => panic!("Should preserve HttpError::Timeout, got: {other:?}"),
1095 }
1096 }
1097
1098 /// Test that `map_tower_error` wraps unknown errors as Transport
1099 #[test]
1100 fn test_map_tower_error_wraps_unknown_as_transport() {
1101 let other_err: tower::BoxError = Box::new(std::io::Error::new(
1102 std::io::ErrorKind::ConnectionRefused,
1103 "connection refused",
1104 ));
1105 let result = map_tower_error(other_err, Duration::from_secs(30));
1106
1107 assert!(
1108 matches!(result, HttpError::Transport(_)),
1109 "Should wrap unknown errors as Transport, got: {result:?}"
1110 );
1111 }
1112
1113 // ==========================================================================
1114 // Cancellation chain test
1115 //
1116 // Proves that dropping the response future from HttpClient cancels the
1117 // inner service future through the toolkit-http middleware stack
1118 // (Buffer → Concurrency → inner service). Retry is disabled to
1119 // isolate the cancellation path.
1120 //
1121 // Uses build_with_inner_service() to inject a fake slow service at the
1122 // bottom of the real tower stack - no HTTP server needed.
1123 // ==========================================================================
1124
1125 /// Dropping the `HttpClient::send()` future must cancel the inner
1126 /// service future through the full middleware stack.
1127 ///
1128 /// Injects a fake service via `build_with_inner_service()` that
1129 /// blocks on a `Notify` (never completes) and signals a second
1130 /// `Notify` from its `Drop` impl. No sleeps - purely notification-based.
1131 #[tokio::test]
1132 async fn test_cancellation_propagates_through_full_stack() {
1133 use crate::response::ResponseBody;
1134 use std::future::Future;
1135 use std::pin::Pin;
1136 use std::sync::Arc;
1137 use std::sync::atomic::{AtomicBool, Ordering};
1138 use std::task::{Context, Poll};
1139 use tower::Service;
1140
1141 #[derive(Clone)]
1142 struct PendingService {
1143 completed: Arc<AtomicBool>,
1144 drop_notifier: Arc<tokio::sync::Notify>,
1145 started_notifier: Arc<tokio::sync::Notify>,
1146 }
1147
1148 struct FutureGuard {
1149 completed: Arc<AtomicBool>,
1150 drop_notifier: Arc<tokio::sync::Notify>,
1151 }
1152
1153 impl Drop for FutureGuard {
1154 fn drop(&mut self) {
1155 if !self.completed.load(Ordering::SeqCst) {
1156 self.drop_notifier.notify_one();
1157 }
1158 }
1159 }
1160
1161 impl Service<http::Request<Full<Bytes>>> for PendingService {
1162 type Response = http::Response<ResponseBody>;
1163 type Error = HttpError;
1164 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1165
1166 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1167 Poll::Ready(Ok(()))
1168 }
1169
1170 fn call(&mut self, _: http::Request<Full<Bytes>>) -> Self::Future {
1171 let completed = self.completed.clone();
1172 let drop_notifier = self.drop_notifier.clone();
1173 let started_notifier = self.started_notifier.clone();
1174 Box::pin(async move {
1175 let _guard = FutureGuard {
1176 completed: completed.clone(),
1177 drop_notifier,
1178 };
1179 // Signal that the request reached the inner service
1180 started_notifier.notify_one();
1181 // Block forever - only completes via drop
1182 std::future::pending::<()>().await;
1183 completed.store(true, Ordering::SeqCst);
1184 unreachable!()
1185 })
1186 }
1187 }
1188
1189 let inner_completed = Arc::new(AtomicBool::new(false));
1190 let drop_notifier = Arc::new(tokio::sync::Notify::new());
1191 let started_notifier = Arc::new(tokio::sync::Notify::new());
1192
1193 let inner = PendingService {
1194 completed: inner_completed.clone(),
1195 drop_notifier: drop_notifier.clone(),
1196 started_notifier: started_notifier.clone(),
1197 };
1198
1199 // Build the real HttpClient stack with our fake service at the bottom.
1200 // Retry disabled to isolate cancellation. Tests: Buffer → Concurrency → PendingService
1201 let client = HttpClientBuilder::new()
1202 .timeout(Duration::from_secs(30))
1203 .retry(None)
1204 .build_with_inner_service(inner.boxed_clone());
1205
1206 // Spawn the request so we can drop it explicitly.
1207 // URL uses https:// so the scheme validation in RequestBuilder passes
1208 // under both the non-fips default (AllowInsecureHttp) and the fips
1209 // default (TlsOnly); the connector here is the injected PendingService,
1210 // so no real TLS handshake happens.
1211 let send_handle = tokio::spawn({
1212 let client = client.clone();
1213 async move { client.get("https://fake/slow").send().await }
1214 });
1215
1216 // Wait for the request to reach the inner service
1217 started_notifier.notified().await;
1218
1219 // Drop the in-flight request by aborting the task
1220 send_handle.abort();
1221
1222 // Wait for the drop notification - no sleep, pure notification
1223 tokio::time::timeout(Duration::from_secs(5), drop_notifier.notified())
1224 .await
1225 .expect(
1226 "Inner service future should have been dropped within 5s - \
1227 the full toolkit-http stack must propagate cancellation",
1228 );
1229
1230 assert!(
1231 !inner_completed.load(Ordering::SeqCst),
1232 "Inner service future should NOT have completed"
1233 );
1234 }
1235}