Skip to main content

toolkit_http/
builder.rs

1use crate::config::{
2    ClientAuthConfig, HttpClientConfig, RedirectConfig, RetryConfig, TlsConfig, TlsRootConfig,
3    TlsVersion, TransportSecurity,
4};
5use crate::error::HttpError;
6use crate::layers::{OtelLayer, RetryLayer, SecureRedirectPolicy, UserAgentLayer};
7use crate::response::ResponseBody;
8use crate::tls;
9use bytes::Bytes;
10use http::Response;
11use http_body_util::{BodyExt, Full};
12use hyper_rustls::HttpsConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::client::legacy::connect::HttpConnector;
15use hyper_util::rt::{TokioExecutor, TokioTimer};
16use std::time::Duration;
17use tower::buffer::Buffer;
18use tower::limit::ConcurrencyLimitLayer;
19use tower::load_shed::LoadShedLayer;
20use tower::timeout::TimeoutLayer;
21use tower::util::BoxCloneService;
22use tower::{ServiceBuilder, ServiceExt};
23use tower_http::decompression::DecompressionLayer;
24use tower_http::follow_redirect::FollowRedirectLayer;
25
26/// Type-erased inner service between layer composition steps in [`HttpClientBuilder::build`].
27type InnerService =
28    BoxCloneService<http::Request<Full<Bytes>>, http::Response<ResponseBody>, HttpError>;
29
30/// Builder for constructing an [`HttpClient`] with a layered tower middleware stack.
31pub struct HttpClientBuilder {
32    config: HttpClientConfig,
33    auth_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
34    metrics_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
35}
36
37impl HttpClientBuilder {
38    /// Create a new builder with default configuration
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            config: HttpClientConfig::default(),
43            auth_layer: None,
44            metrics_layer: None,
45        }
46    }
47
48    /// Create a builder with a specific configuration
49    #[must_use]
50    pub fn with_config(config: HttpClientConfig) -> Self {
51        Self {
52            config,
53            auth_layer: None,
54            metrics_layer: None,
55        }
56    }
57
58    /// Set the per-request timeout
59    ///
60    /// This timeout applies to each individual HTTP request/attempt.
61    /// If retries are enabled, each retry attempt gets its own timeout.
62    #[must_use]
63    pub fn timeout(mut self, timeout: Duration) -> Self {
64        self.config.request_timeout = timeout;
65        self
66    }
67
68    /// Set the total timeout spanning all retry attempts
69    ///
70    /// When set, the entire operation (including all retries and backoff delays)
71    /// must complete within this duration. If the deadline is exceeded,
72    /// the request fails with `HttpError::DeadlineExceeded(total_timeout)`.
73    #[must_use]
74    pub fn total_timeout(mut self, timeout: Duration) -> Self {
75        self.config.total_timeout = Some(timeout);
76        self
77    }
78
79    /// Set the user agent string
80    #[must_use]
81    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
82        self.config.user_agent = user_agent.into();
83        self
84    }
85
86    /// Set the retry configuration
87    #[must_use]
88    pub fn retry(mut self, retry: Option<RetryConfig>) -> Self {
89        self.config.retry = retry;
90        self
91    }
92
93    /// Set the maximum response body size
94    #[must_use]
95    pub fn max_body_size(mut self, size: usize) -> Self {
96        self.config.max_body_size = size;
97        self
98    }
99
100    /// Set transport security mode
101    ///
102    /// Use `TransportSecurity::TlsOnly` to enforce HTTPS for all connections.
103    #[must_use]
104    pub fn transport(mut self, transport: TransportSecurity) -> Self {
105        self.config.transport = transport;
106        self
107    }
108
109    /// Deny insecure HTTP connections, enforcing TLS for all traffic
110    ///
111    /// Equivalent to `.transport(TransportSecurity::TlsOnly)`.
112    ///
113    /// Use this when TLS enforcement is required (e.g., production environments).
114    #[must_use]
115    pub fn deny_insecure_http(mut self) -> Self {
116        tracing::debug!(
117            target: "toolkit_http::security",
118            "deny_insecure_http() called - enforcing TLS for all connections"
119        );
120        self.config.transport = TransportSecurity::TlsOnly;
121        self
122    }
123
124    /// Set the TLS handshake configuration (minimum protocol version and
125    /// optional mutual-TLS client identity).
126    ///
127    /// Replaces the entire [`TlsConfig`]; use [`crate::config::TlsConfig`]'s
128    /// fields to set `min_version` and `client_auth`. The root-trust strategy
129    /// is configured separately via the `tls_roots` config field.
130    ///
131    /// Mutual-TLS PEM material referenced by `client_auth` is read and parsed
132    /// in [`HttpClientBuilder::build`]; IO/parse failures surface there as
133    /// [`HttpError::Tls`].
134    ///
135    /// [`HttpClientBuilder::build`]: crate::builder::HttpClientBuilder::build
136    #[must_use]
137    pub fn tls(mut self, tls: TlsConfig) -> Self {
138        self.config.tls = tls;
139        self
140    }
141
142    /// Set the minimum TLS protocol version, leaving the rest of the
143    /// [`TlsConfig`] (e.g. `client_auth`) untouched.
144    ///
145    /// Convenience shortcut for mutating `config.tls.min_version` without
146    /// rebuilding the whole [`TlsConfig`] via [`HttpClientBuilder::tls`].
147    ///
148    /// [`HttpClientBuilder::tls`]: crate::builder::HttpClientBuilder::tls
149    #[must_use]
150    pub fn tls_min_version(mut self, min_version: TlsVersion) -> Self {
151        self.config.tls.min_version = min_version;
152        self
153    }
154
155    /// Set the mutual-TLS client identity, leaving the rest of the
156    /// [`TlsConfig`] (e.g. `min_version`) untouched.
157    ///
158    /// Convenience shortcut for mutating `config.tls.client_auth` without
159    /// rebuilding the whole [`TlsConfig`] via [`HttpClientBuilder::tls`]. The
160    /// referenced PEM material is read and parsed in
161    /// [`HttpClientBuilder::build`]; IO/parse failures surface there as
162    /// [`HttpError::Tls`].
163    ///
164    /// [`HttpClientBuilder::tls`]: crate::builder::HttpClientBuilder::tls
165    /// [`HttpClientBuilder::build`]: crate::builder::HttpClientBuilder::build
166    #[must_use]
167    pub fn client_auth(mut self, client_auth: ClientAuthConfig) -> Self {
168        self.config.tls.client_auth = Some(client_auth);
169        self
170    }
171
172    /// Enable OpenTelemetry tracing layer
173    ///
174    /// When enabled, creates spans for outbound requests with HTTP metadata
175    /// and injects W3C trace context headers (when `otel` feature is enabled).
176    #[must_use]
177    pub fn with_otel(mut self) -> Self {
178        self.config.otel = true;
179        self
180    }
181
182    /// Insert an optional auth layer between retry and timeout in the stack.
183    ///
184    /// Stack position: `… → Retry → **this layer** → Timeout → …`
185    ///
186    /// The layer sits inside the retry loop so each attempt re-executes it
187    /// (e.g. re-reads a refreshed bearer token). Only one auth layer can be
188    /// set; a second call replaces the first.
189    #[must_use]
190    pub fn with_auth_layer(
191        mut self,
192        wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
193    ) -> Self {
194        self.auth_layer = Some(Box::new(wrap));
195        self
196    }
197
198    /// Insert a metrics layer between the rate-limit and retry layers.
199    ///
200    /// Stack position: `… → RateLimit → **this layer** → Retry → Auth → Timeout → …`
201    ///
202    /// The layer sits outside the retry loop so it observes a single logical
203    /// request once, regardless of how many transport-level retries the retry
204    /// layer issues. If the use case is "count every attempt", the equivalent
205    /// observation can be made with [`with_otel`](Self::with_otel) (one span
206    /// per attempt) and a `tracing` → metrics bridge.
207    ///
208    /// Only one metrics layer can be set; a second call replaces the first.
209    #[must_use]
210    pub fn with_metrics_layer(
211        mut self,
212        wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
213    ) -> Self {
214        self.metrics_layer = Some(Box::new(wrap));
215        self
216    }
217
218    /// Record OpenTelemetry client metrics using the default route classifier.
219    ///
220    /// Emits `http.client.request.duration` (a histogram, in seconds) for each
221    /// logical request. `client_type` names the instrumentation scope (the
222    /// meter), like the server-side gear name. The default classifier labels
223    /// each request `"METHOD host"` via the `http.route` attribute — a bounded
224    /// value that never contains a raw path, so metric cardinality stays
225    /// controlled. Use [`with_metrics_by`](Self::with_metrics_by)
226    /// to supply route templates.
227    ///
228    /// Like [`with_metrics_layer`](Self::with_metrics_layer), the layer sits
229    /// outside the retry loop, so it observes one logical request, not each
230    /// attempt.
231    #[cfg(feature = "otel")]
232    #[must_use]
233    pub fn with_metrics(self, client_type: impl Into<String>) -> Self {
234        self.with_metrics_by(client_type, crate::layers::default_classify)
235    }
236
237    /// Record OpenTelemetry client metrics with a custom route classifier.
238    ///
239    /// Like [`with_metrics`](Self::with_metrics) but accepts a classifier — `classify` produces the
240    /// `http.route` attribute for each request (the Rust analogue of go-appkit's
241    /// `ClassifyRequest`). It MUST return a bounded set of values (e.g.
242    /// `"GET /users/{id}"`), never raw paths, to avoid unbounded metric
243    /// cardinality.
244    #[cfg(feature = "otel")]
245    #[must_use]
246    pub fn with_metrics_by(
247        self,
248        client_type: impl Into<String>,
249        classify: impl Fn(&http::Request<Full<Bytes>>) -> std::borrow::Cow<'static, str>
250        + Send
251        + Sync
252        + 'static,
253    ) -> Self {
254        let client_type = client_type.into();
255        let classify: crate::layers::ClassifyFn = std::sync::Arc::new(classify);
256        let layer = crate::layers::MetricsLayer::new(&client_type, classify);
257        self.with_metrics_layer(move |svc| {
258            ServiceBuilder::new()
259                .layer(layer)
260                .service(svc)
261                .boxed_clone()
262        })
263    }
264
265    /// Set the buffer capacity for concurrent request handling
266    ///
267    /// The HTTP client uses an internal buffer to allow concurrent requests
268    /// without external locking. This sets the maximum number of requests
269    /// that can be queued.
270    ///
271    /// **Note**: A capacity of 0 is invalid and will be clamped to 1.
272    /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
273    #[must_use]
274    pub fn buffer_capacity(mut self, capacity: usize) -> Self {
275        // Clamp to at least 1 - tower::Buffer panics with capacity=0
276        self.config.buffer_capacity = capacity.max(1);
277        self
278    }
279
280    /// Set the maximum number of redirects to follow
281    ///
282    /// Set to `0` to disable redirect following (3xx responses pass through as-is).
283    /// Default: 10
284    #[must_use]
285    pub fn max_redirects(mut self, max_redirects: usize) -> Self {
286        self.config.redirect.max_redirects = max_redirects;
287        self
288    }
289
290    /// Disable redirect following
291    ///
292    /// Equivalent to `.max_redirects(0)`. When disabled, 3xx responses are
293    /// returned to the caller without following the `Location` header.
294    #[must_use]
295    pub fn no_redirects(mut self) -> Self {
296        self.config.redirect = RedirectConfig::disabled();
297        self
298    }
299
300    /// Set the redirect policy configuration
301    ///
302    /// Use this to configure redirect security settings:
303    /// - `same_origin_only`: Only follow redirects to the same host
304    /// - `strip_sensitive_headers`: Remove `Authorization`/`Cookie` on cross-origin
305    /// - `allow_https_downgrade`: Allow HTTPS → HTTP redirects (not recommended)
306    ///
307    /// # Example
308    ///
309    /// ```rust,ignore
310    /// let client = HttpClient::builder()
311    ///     .redirect(RedirectConfig::permissive()) // Allow all redirects with header stripping
312    ///     .build()?;
313    /// ```
314    #[must_use]
315    pub fn redirect(mut self, config: RedirectConfig) -> Self {
316        self.config.redirect = config;
317        self
318    }
319
320    /// Set the idle connection timeout for the connection pool
321    ///
322    /// Connections that remain idle for longer than this duration will be
323    /// closed and removed from the pool. Default: 90 seconds.
324    ///
325    /// Set to `None` to disable idle timeout (connections kept indefinitely).
326    #[must_use]
327    pub fn pool_idle_timeout(mut self, timeout: Option<Duration>) -> Self {
328        self.config.pool_idle_timeout = timeout;
329        self
330    }
331
332    /// Set the maximum number of idle connections per host
333    ///
334    /// Limits how many idle connections are kept in the pool for each host.
335    /// Default: 32.
336    ///
337    /// - Setting to `0` disables connection reuse entirely
338    /// - Setting too high may waste resources on rarely-used connections
339    #[must_use]
340    pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
341        self.config.pool_max_idle_per_host = max;
342        self
343    }
344
345    /// Build the HTTP client with all configured layers
346    ///
347    /// # Errors
348    /// Returns an error if TLS initialization fails or configuration is invalid.
349    ///
350    /// Under `--features fips`, returns [`HttpError::InsecureTransport`] when
351    /// `config.transport == TransportSecurity::AllowInsecureHttp`. FIPS builds
352    /// must use [`TransportSecurity::TlsOnly`] — there is no opt-out.
353    pub fn build(self) -> Result<crate::HttpClient, HttpError> {
354        // Reject AllowInsecureHttp under --features fips before any TLS work.
355        // The check lives here (rather than in HttpClientConfig) so that
356        // constructing a config with AllowInsecureHttp is still cheap and
357        // infallible; only actually building a client fails closed.
358        #[cfg(feature = "fips")]
359        if self.config.transport == TransportSecurity::AllowInsecureHttp {
360            tracing::warn!(
361                target: "toolkit_http::security",
362                "rejecting AllowInsecureHttp under --features fips: returning HttpError::InsecureTransport"
363            );
364            return Err(HttpError::InsecureTransport);
365        }
366
367        let timeout = self.config.request_timeout;
368        let total_timeout = self.config.total_timeout;
369
370        // Build the HTTPS connector (may fail for Native roots if no valid certs)
371        let https = build_https_connector(
372            self.config.tls_roots,
373            self.config.transport,
374            &self.config.tls,
375        )?;
376
377        // Create the base hyper client with HTTP/2 support and connection pool settings
378        let mut client_builder = Client::builder(TokioExecutor::new());
379
380        // Configure connection pool
381        // CRITICAL: pool_timer is required for pool_idle_timeout to work!
382        client_builder
383            .pool_timer(TokioTimer::new())
384            .pool_max_idle_per_host(self.config.pool_max_idle_per_host)
385            .http2_only(false); // Allow both HTTP/1 and HTTP/2 via ALPN
386
387        // Set idle timeout (None = no timeout, connections kept indefinitely)
388        if let Some(idle_timeout) = self.config.pool_idle_timeout {
389            client_builder.pool_idle_timeout(idle_timeout);
390        }
391
392        let hyper_client = client_builder.build::<_, Full<Bytes>>(https);
393
394        // Parse user agent header (may fail)
395        let ua_layer = UserAgentLayer::try_new(&self.config.user_agent)?;
396
397        // =======================================================================
398        // Tower Layer Stack (outer to inner)
399        // =======================================================================
400        //
401        // Request flow (outer → inner):
402        //   Buffer → OtelLayer → LoadShed/Concurrency → [MetricsLayer?] →
403        //   RetryLayer → [AuthLayer?] → ErrorMapping → Timeout → UserAgent →
404        //   Decompression → FollowRedirect → hyper_client
405        //
406        // AuthLayer (if set via with_auth_layer) sits inside the retry
407        // loop so each retry re-acquires credentials (e.g. refreshed
408        // bearer token).
409        //
410        // MetricsLayer (if set via with_metrics_layer) sits outside the
411        // retry loop so it observes one logical request, not per-attempt.
412        //
413        // Response flow (inner → outer):
414        //   hyper_client → FollowRedirect → Decompression → UserAgent →
415        //   Timeout → ErrorMapping → [AuthLayer?] → RetryLayer →
416        //   [MetricsLayer?] → LoadShed/Concurrency → OtelLayer → Buffer
417        //
418        // Key semantics (reqwest-like):
419        //  - send() returns Ok(Response) for ALL HTTP statuses (including 4xx/5xx)
420        //  - send() returns Err only for transport/timeout/TLS errors
421        //  - Non-2xx converted to error ONLY via error_for_status()
422        //  - RetryLayer handles both Err (transport) and Ok(Response) (status)
423        //     retries internally, draining body before retry for connection reuse
424        //  - FollowRedirect handles 3xx responses internally with security protections:
425        //     * Same-origin enforcement (default) - blocks SSRF attacks
426        //     * Sensitive header stripping on cross-origin redirects
427        //     * HTTPS downgrade protection
428        //
429        // =======================================================================
430        //
431        let redirect_policy = SecureRedirectPolicy::new(self.config.redirect.clone());
432
433        // Build the service stack with secure redirect following
434        let service = ServiceBuilder::new()
435            .layer(TimeoutLayer::new(timeout))
436            .layer(ua_layer)
437            .layer(DecompressionLayer::new())
438            .layer(FollowRedirectLayer::with_policy(redirect_policy))
439            .service(hyper_client);
440
441        // Map the decompression body to our boxed ResponseBody type.
442        // This converts Response<DecompressionBody<Incoming>> to Response<ResponseBody>.
443        //
444        // The decompression body's error type is tower_http::BoxError, which we convert
445        // to our boxed error type for consistency with the ResponseBody definition.
446        let service = service.map_response(map_decompression_response);
447
448        // Map errors to HttpError with proper timeout duration
449        let service = service.map_err(move |e: tower::BoxError| map_tower_error(e, timeout));
450
451        // Box the service for type erasure
452        let mut boxed_service = service.boxed_clone();
453
454        // Apply auth layer (between timeout and retry).
455        // Inside retry so each retry attempt re-acquires the token.
456        if let Some(wrap) = self.auth_layer {
457            boxed_service = wrap(boxed_service);
458        }
459
460        // Conditionally wrap with RetryLayer
461        //
462        // RetryLayer handles retries for both:
463        // - Err(HttpError::Transport/Timeout) - transport-level failures
464        // - Ok(Response) with retryable status codes (429, 5xx for GET, etc.)
465        //
466        // When retrying on status codes, RetryLayer drains the response body
467        // (up to configured limit) to allow connection reuse.
468        //
469        // If total_timeout is set, the entire operation (including all retries)
470        // must complete within that duration.
471        if let Some(ref retry_config) = self.config.retry {
472            let retry_layer = RetryLayer::with_total_timeout(retry_config.clone(), total_timeout);
473            let retry_service = ServiceBuilder::new()
474                .layer(retry_layer)
475                .service(boxed_service);
476            boxed_service = retry_service.boxed_clone();
477        }
478
479        // Apply metrics layer (between retry and rate-limit).
480        // Outside the retry loop: observes one logical request, not per-attempt.
481        if let Some(wrap) = self.metrics_layer {
482            boxed_service = wrap(boxed_service);
483        }
484
485        // Conditionally wrap with concurrency limit + load shedding
486        // LoadShedLayer returns error immediately when ConcurrencyLimitLayer is saturated
487        // instead of waiting indefinitely (Poll::Pending)
488        if let Some(rate_limit) = self.config.rate_limit
489            && rate_limit.max_concurrent_requests < usize::MAX
490        {
491            let limited_service = ServiceBuilder::new()
492                .layer(LoadShedLayer::new())
493                .layer(ConcurrencyLimitLayer::new(
494                    rate_limit.max_concurrent_requests,
495                ))
496                .service(boxed_service);
497            // Map load shed errors to HttpError::Overloaded
498            let limited_service = limited_service.map_err(map_load_shed_error);
499            boxed_service = limited_service.boxed_clone();
500        }
501
502        // Conditionally wrap with OTEL tracing layer (outermost layer before buffer)
503        // Applied last so it sees the final request after UserAgent and other modifications.
504        // Creates spans, records status, and injects trace context headers.
505        if self.config.otel {
506            let otel_service = ServiceBuilder::new()
507                .layer(OtelLayer::new())
508                .service(boxed_service);
509            boxed_service = otel_service.boxed_clone();
510        }
511
512        // Wrap in Buffer as the final step for true concurrent access
513        // Buffer spawns a background task that processes requests from a channel,
514        // providing Clone + Send + Sync without any mutex serialization.
515        let buffer_capacity = self.config.buffer_capacity.max(1);
516        let buffered_service: crate::client::BufferedService =
517            Buffer::new(boxed_service, buffer_capacity);
518
519        Ok(crate::HttpClient {
520            service: buffered_service,
521            max_body_size: self.config.max_body_size,
522            transport_security: self.config.transport,
523        })
524    }
525}
526
527#[cfg(test)]
528impl HttpClientBuilder {
529    /// Build an `HttpClient` with a custom inner service replacing the
530    /// hyper connector. The full middleware stack (Retry, Concurrency,
531    /// Buffer, etc.) is applied on top.
532    ///
533    /// The inner service must handle `Request<Full<Bytes>>` and return
534    /// `Response<ResponseBody>`. Use this to inject a fake slow service
535    /// for cancellation testing without needing a real HTTP server.
536    fn build_with_inner_service(self, inner: InnerService) -> crate::HttpClient {
537        let mut boxed_service = inner;
538
539        if let Some(ref retry_config) = self.config.retry {
540            let retry_layer =
541                RetryLayer::with_total_timeout(retry_config.clone(), self.config.total_timeout);
542            let retry_service = ServiceBuilder::new()
543                .layer(retry_layer)
544                .service(boxed_service);
545            boxed_service = retry_service.boxed_clone();
546        }
547
548        if let Some(rate_limit) = self.config.rate_limit
549            && rate_limit.max_concurrent_requests < usize::MAX
550        {
551            let limited_service = ServiceBuilder::new()
552                .layer(LoadShedLayer::new())
553                .layer(ConcurrencyLimitLayer::new(
554                    rate_limit.max_concurrent_requests,
555                ))
556                .service(boxed_service);
557            let limited_service = limited_service.map_err(map_load_shed_error);
558            boxed_service = limited_service.boxed_clone();
559        }
560
561        let buffer_capacity = self.config.buffer_capacity.max(1);
562        let buffered_service: crate::client::BufferedService =
563            Buffer::new(boxed_service, buffer_capacity);
564
565        crate::HttpClient {
566            service: buffered_service,
567            max_body_size: self.config.max_body_size,
568            transport_security: self.config.transport,
569        }
570    }
571}
572
573impl Default for HttpClientBuilder {
574    fn default() -> Self {
575        Self::new()
576    }
577}
578
579/// Map tower errors to `HttpError` with actual timeout duration
580///
581/// Attempts to extract existing `HttpError` from the boxed error before
582/// wrapping as `Transport`. This preserves typed errors like `Overloaded`
583/// and `ServiceClosed` that may have been boxed by tower middleware.
584fn map_tower_error(err: tower::BoxError, timeout: Duration) -> HttpError {
585    if err.is::<tower::timeout::error::Elapsed>() {
586        return HttpError::Timeout(timeout);
587    }
588
589    // Try to extract existing HttpError before wrapping as Transport
590    match err.downcast::<HttpError>() {
591        Ok(http_err) => *http_err,
592        Err(other) => HttpError::Transport(other),
593    }
594}
595
596/// Map load shed errors to `HttpError::Overloaded`
597fn map_load_shed_error(err: tower::BoxError) -> HttpError {
598    if err.is::<tower::load_shed::error::Overloaded>() {
599        HttpError::Overloaded
600    } else {
601        // Pass through other HttpError types (from inner service)
602        match err.downcast::<HttpError>() {
603            Ok(http_err) => *http_err,
604            Err(err) => HttpError::Transport(err),
605        }
606    }
607}
608
609/// Map the decompression response to our boxed response body type.
610///
611/// This converts `Response<DecompressionBody<Incoming>>` to `Response<ResponseBody>`
612/// by boxing the body with appropriate error type mapping.
613fn map_decompression_response<B>(response: Response<B>) -> Response<ResponseBody>
614where
615    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
616    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
617{
618    let (parts, body) = response.into_parts();
619    // Convert the decompression body errors to our boxed error type.
620    // tower-http's DecompressionBody uses tower_http::BoxError which is
621    // compatible with our Box<dyn Error + Send + Sync> via Into.
622    let boxed_body: ResponseBody = body.map_err(Into::into).boxed();
623    Response::from_parts(parts, boxed_body)
624}
625
626/// Build the HTTPS connector with the specified TLS root configuration.
627///
628/// For `TlsRootConfig::Native`, uses cached native root certificates to avoid
629/// repeated OS certificate store lookups on each `build()` call.
630///
631/// HTTP/2 is enabled via `enable_all_versions()` which configures ALPN to
632/// advertise both h2 and http/1.1. Protocol selection happens during TLS
633/// handshake based on server support.
634///
635/// # Errors
636///
637/// Returns `HttpError::Tls` if `TlsRootConfig::Native` is requested but no
638/// valid root certificates are available from the OS certificate store.
639fn build_https_connector(
640    tls_roots: TlsRootConfig,
641    transport: TransportSecurity,
642    tls: &TlsConfig,
643) -> Result<HttpsConnector<HttpConnector>, HttpError> {
644    let allow_http = transport == TransportSecurity::AllowInsecureHttp;
645
646    // Both branches build a `ClientConfig` ourselves (rather than using
647    // `with_provider_and_webpki_roots`) so we can apply `require_ems = true`
648    // under the `fips` feature — see `tls::build_client_config`. The
649    // functions return `tls::TlsConfigError` (an enum implementing
650    // `std::error::Error`); boxing it into `HttpError::Tls`'s
651    // `Box<dyn Error + Send + Sync>` preserves the source chain via
652    // `TlsConfigError`'s own `Error::source()` impl.
653    let client_config = match tls_roots {
654        TlsRootConfig::WebPki => tls::webpki_roots_client_config(tls),
655        TlsRootConfig::Native => tls::native_roots_client_config(tls),
656    }
657    .map_err(|e| HttpError::Tls(Box::new(e)))?;
658
659    let builder = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(client_config);
660    let connector = if allow_http {
661        builder.https_or_http().enable_all_versions().build()
662    } else {
663        builder.https_only().enable_all_versions().build()
664    };
665    Ok(connector)
666}
667
668#[cfg(test)]
669#[cfg_attr(coverage_nightly, coverage(off))]
670mod tests {
671    use super::*;
672    use crate::config::DEFAULT_USER_AGENT;
673
674    #[test]
675    fn test_builder_default() {
676        let builder = HttpClientBuilder::new();
677        assert_eq!(builder.config.request_timeout, Duration::from_secs(30));
678        assert_eq!(builder.config.user_agent, DEFAULT_USER_AGENT);
679        assert!(builder.config.retry.is_some());
680        assert_eq!(builder.config.buffer_capacity, 1024);
681    }
682
683    #[test]
684    fn test_builder_with_config() {
685        let config = HttpClientConfig::minimal();
686        let builder = HttpClientBuilder::with_config(config);
687        assert_eq!(builder.config.request_timeout, Duration::from_secs(10));
688    }
689
690    #[test]
691    fn test_builder_timeout() {
692        let builder = HttpClientBuilder::new().timeout(Duration::from_mins(1));
693        assert_eq!(builder.config.request_timeout, Duration::from_mins(1));
694    }
695
696    #[test]
697    fn test_builder_user_agent() {
698        let builder = HttpClientBuilder::new().user_agent("custom/1.0");
699        assert_eq!(builder.config.user_agent, "custom/1.0");
700    }
701
702    #[test]
703    fn test_builder_retry() {
704        let builder = HttpClientBuilder::new().retry(None);
705        assert!(builder.config.retry.is_none());
706    }
707
708    #[test]
709    fn test_builder_max_body_size() {
710        let builder = HttpClientBuilder::new().max_body_size(1024);
711        assert_eq!(builder.config.max_body_size, 1024);
712    }
713
714    #[test]
715    fn test_builder_transport_security() {
716        let builder = HttpClientBuilder::new().transport(TransportSecurity::TlsOnly);
717        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
718
719        let builder = HttpClientBuilder::new().deny_insecure_http();
720        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
721
722        let builder = HttpClientBuilder::new();
723        #[cfg(not(feature = "fips"))]
724        assert_eq!(
725            builder.config.transport,
726            TransportSecurity::AllowInsecureHttp
727        );
728        #[cfg(feature = "fips")]
729        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
730    }
731
732    #[test]
733    fn test_builder_otel() {
734        let builder = HttpClientBuilder::new().with_otel();
735        assert!(builder.config.otel);
736    }
737
738    #[test]
739    fn test_builder_buffer_capacity() {
740        let builder = HttpClientBuilder::new().buffer_capacity(512);
741        assert_eq!(builder.config.buffer_capacity, 512);
742    }
743
744    /// Test that `buffer_capacity=0` is clamped to 1 to prevent panic.
745    ///
746    /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
747    #[test]
748    fn test_builder_buffer_capacity_zero_clamped() {
749        let builder = HttpClientBuilder::new().buffer_capacity(0);
750        assert_eq!(
751            builder.config.buffer_capacity, 1,
752            "buffer_capacity=0 should be clamped to 1"
753        );
754    }
755
756    /// Test that `buffer_capacity=0` via config is clamped during `build()`.
757    #[tokio::test]
758    async fn test_builder_buffer_capacity_zero_in_config_clamped() {
759        let config = HttpClientConfig {
760            buffer_capacity: 0, // Invalid - should be clamped in build()
761            ..Default::default()
762        };
763        let result = HttpClientBuilder::with_config(config).build();
764        // Should succeed (clamped to 1), not panic
765        assert!(
766            result.is_ok(),
767            "build() should succeed with capacity clamped to 1"
768        );
769    }
770
771    #[tokio::test]
772    async fn test_builder_build_with_otel() {
773        let client = HttpClientBuilder::new().with_otel().build();
774        assert!(client.is_ok());
775    }
776
777    #[tokio::test]
778    async fn test_builder_with_auth_layer() {
779        let client = HttpClientBuilder::new()
780            .with_auth_layer(|svc| svc) // identity transform
781            .build();
782        assert!(client.is_ok());
783    }
784
785    #[tokio::test]
786    async fn test_builder_with_metrics_layer() {
787        let client = HttpClientBuilder::new()
788            .with_metrics_layer(|svc| svc) // identity transform
789            .build();
790        assert!(client.is_ok());
791    }
792
793    #[tokio::test]
794    async fn test_builder_with_metrics_layer_second_call_replaces_first() {
795        use std::sync::Arc;
796        use std::sync::atomic::{AtomicUsize, Ordering};
797
798        let call_count = Arc::new(AtomicUsize::new(0));
799        let call_count2 = call_count.clone();
800
801        // Second call should replace the first; only one layer is applied.
802        let client = HttpClientBuilder::new()
803            .with_metrics_layer(|_svc| {
804                // This closure should NOT be called (replaced by the second).
805                panic!("first metrics layer should have been replaced");
806            })
807            .with_metrics_layer(move |svc| {
808                call_count2.fetch_add(1, Ordering::SeqCst);
809                svc
810            })
811            .build();
812
813        assert!(client.is_ok());
814        assert_eq!(
815            call_count.load(Ordering::SeqCst),
816            1,
817            "second metrics layer must be applied exactly once"
818        );
819    }
820
821    #[tokio::test]
822    async fn test_builder_build() {
823        let client = HttpClientBuilder::new().build();
824        assert!(client.is_ok());
825    }
826
827    #[tokio::test]
828    async fn test_builder_build_with_deny_insecure_http() {
829        let client = HttpClientBuilder::new().deny_insecure_http().build();
830        assert!(client.is_ok());
831    }
832
833    #[tokio::test]
834    async fn test_builder_build_with_sse_config() {
835        use crate::config::HttpClientConfig;
836        let config = HttpClientConfig::sse();
837        let client = HttpClientBuilder::with_config(config).build();
838        assert!(client.is_ok(), "SSE config should build successfully");
839    }
840
841    #[tokio::test]
842    async fn test_builder_build_invalid_user_agent() {
843        let client = HttpClientBuilder::new()
844            .user_agent("invalid\x00agent")
845            .build();
846        assert!(client.is_err());
847    }
848
849    #[tokio::test]
850    async fn test_builder_default_uses_webpki_roots() {
851        let builder = HttpClientBuilder::new();
852        assert_eq!(builder.config.tls_roots, TlsRootConfig::WebPki);
853        // Build should succeed without OS native roots
854        let client = builder.build();
855        assert!(client.is_ok());
856    }
857
858    #[tokio::test]
859    async fn test_builder_native_roots() {
860        let config = HttpClientConfig {
861            tls_roots: TlsRootConfig::Native,
862            ..Default::default()
863        };
864        let result = HttpClientBuilder::with_config(config).build();
865
866        // Native roots may succeed or fail depending on OS certificate availability.
867        // On systems with certs: Ok(_)
868        // On minimal containers without certs: Err(HttpError::Tls(_))
869        match &result {
870            Ok(_) => {
871                // Success on systems with native certs
872            }
873            Err(HttpError::Tls(err)) => {
874                // Expected failure on systems without native certs
875                let msg = err.to_string();
876                assert!(
877                    msg.contains("native root") || msg.contains("certificate"),
878                    "TLS error should mention certificates: {msg}"
879                );
880            }
881            Err(other) => {
882                panic!("Unexpected error type: {other:?}");
883            }
884        }
885    }
886
887    #[tokio::test]
888    async fn test_builder_webpki_roots_https_only() {
889        let config = HttpClientConfig {
890            tls_roots: TlsRootConfig::WebPki,
891            transport: TransportSecurity::TlsOnly,
892            ..Default::default()
893        };
894        let client = HttpClientBuilder::with_config(config).build();
895        assert!(client.is_ok());
896    }
897
898    /// Verify HTTP/2 is enabled for all TLS root configurations.
899    ///
900    /// HTTP/2 support is configured via `enable_all_versions()` on the connector,
901    /// which sets up ALPN to negotiate h2 or http/1.1 during TLS handshake.
902    /// The hyper client uses `http2_only(false)` to allow both protocols.
903    ///
904    /// The `AllowInsecureHttp` sub-cases are skipped under `--features fips`
905    /// because the FIPS guard in `build()` rejects insecure transport.
906    #[tokio::test]
907    async fn test_http2_enabled_for_all_configurations() {
908        // Test WebPki with default transport (AllowInsecureHttp without fips, TlsOnly under fips)
909        let client = HttpClientBuilder::new().build();
910        assert!(
911            client.is_ok(),
912            "WebPki + default transport should build with HTTP/2 enabled"
913        );
914
915        // Test WebPki with TlsOnly (HTTPS only)
916        let client = HttpClientBuilder::new()
917            .transport(TransportSecurity::TlsOnly)
918            .build();
919        assert!(
920            client.is_ok(),
921            "WebPki + TlsOnly should build with HTTP/2 enabled"
922        );
923
924        // Test Native roots with AllowInsecureHttp (non-fips only)
925        #[cfg(not(feature = "fips"))]
926        {
927            let config = HttpClientConfig {
928                tls_roots: TlsRootConfig::Native,
929                transport: TransportSecurity::AllowInsecureHttp,
930                ..Default::default()
931            };
932            let client = HttpClientBuilder::with_config(config).build();
933            assert!(
934                client.is_ok(),
935                "Native + AllowInsecureHttp should build with HTTP/2 enabled"
936            );
937        }
938
939        // Test Native roots with TlsOnly (HTTPS only)
940        let config = HttpClientConfig {
941            tls_roots: TlsRootConfig::Native,
942            transport: TransportSecurity::TlsOnly,
943            ..Default::default()
944        };
945        let client = HttpClientBuilder::with_config(config).build();
946        assert!(
947            client.is_ok(),
948            "Native + TlsOnly should build with HTTP/2 enabled"
949        );
950    }
951
952    /// Test that concurrency limit uses fail-fast behavior (C2).
953    ///
954    /// `LoadShedLayer` + `ConcurrencyLimitLayer` combination returns Overloaded error
955    /// immediately when capacity is exhausted, instead of blocking indefinitely.
956    #[tokio::test]
957    async fn test_load_shedding_returns_overloaded_error() {
958        use bytes::Bytes;
959        use http::{Request, Response};
960        use http_body_util::Full;
961        use std::future::Future;
962        use std::pin::Pin;
963        use std::sync::Arc;
964        use std::sync::atomic::{AtomicUsize, Ordering};
965        use std::task::{Context, Poll};
966        use tower::Service;
967        use tower::ServiceExt;
968
969        // A service that holds a slot forever once called
970        #[derive(Clone)]
971        struct SlotHoldingService {
972            active: Arc<AtomicUsize>,
973        }
974
975        impl Service<Request<Full<Bytes>>> for SlotHoldingService {
976            type Response = Response<Full<Bytes>>;
977            type Error = HttpError;
978            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
979
980            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
981                Poll::Ready(Ok(()))
982            }
983
984            fn call(&mut self, _: Request<Full<Bytes>>) -> Self::Future {
985                self.active.fetch_add(1, Ordering::SeqCst);
986                // Never complete - holds the slot
987                Box::pin(std::future::pending())
988            }
989        }
990
991        let active = Arc::new(AtomicUsize::new(0));
992
993        // Build a service with load shedding and concurrency limit of 1
994        let service = tower::ServiceBuilder::new()
995            .layer(LoadShedLayer::new())
996            .layer(ConcurrencyLimitLayer::new(1))
997            .service(SlotHoldingService {
998                active: active.clone(),
999            });
1000
1001        let service = service.map_err(map_load_shed_error);
1002
1003        // First request: will occupy the single slot
1004        let req1 = Request::builder()
1005            .uri("http://test")
1006            .body(Full::new(Bytes::new()))
1007            .unwrap();
1008        let mut svc1 = service.clone();
1009
1010        let svc1_ready = svc1.ready().await.unwrap();
1011        let _pending_fut = svc1_ready.call(req1);
1012
1013        // Wait for the slot to be occupied
1014        tokio::time::sleep(Duration::from_millis(10)).await;
1015        assert_eq!(
1016            active.load(Ordering::SeqCst),
1017            1,
1018            "First request should be active"
1019        );
1020
1021        // Second request: LoadShedLayer should reject because ConcurrencyLimit is at capacity
1022        let req2 = Request::builder()
1023            .uri("http://test")
1024            .body(Full::new(Bytes::new()))
1025            .unwrap();
1026
1027        let mut svc2 = service.clone();
1028
1029        // LoadShedLayer checks poll_ready and returns Overloaded if inner service is not ready
1030        let result = tokio::time::timeout(Duration::from_millis(100), async {
1031            // poll_ready should return quickly with error (not block)
1032            match svc2.ready().await {
1033                Ok(ready_svc) => ready_svc.call(req2).await,
1034                Err(e) => Err(e),
1035            }
1036        })
1037        .await;
1038
1039        // Should complete within timeout (not hang) and return Overloaded
1040        assert!(result.is_ok(), "Request should not hang");
1041        let err = result.unwrap().unwrap_err();
1042        assert!(
1043            matches!(err, HttpError::Overloaded),
1044            "Expected Overloaded error, got: {err:?}"
1045        );
1046    }
1047
1048    // ==========================================================================
1049    // map_tower_error Tests
1050    // ==========================================================================
1051
1052    /// Test that `map_tower_error` preserves `HttpError::Overloaded` when wrapped in `BoxError`
1053    #[test]
1054    fn test_map_tower_error_preserves_overloaded() {
1055        let http_err = HttpError::Overloaded;
1056        let boxed: tower::BoxError = Box::new(http_err);
1057        let result = map_tower_error(boxed, Duration::from_secs(30));
1058
1059        assert!(
1060            matches!(result, HttpError::Overloaded),
1061            "Should preserve HttpError::Overloaded, got: {result:?}"
1062        );
1063    }
1064
1065    /// Test that `map_tower_error` preserves `HttpError::ServiceClosed` when wrapped in `BoxError`
1066    #[test]
1067    fn test_map_tower_error_preserves_service_closed() {
1068        let http_err = HttpError::ServiceClosed;
1069        let boxed: tower::BoxError = Box::new(http_err);
1070        let result = map_tower_error(boxed, Duration::from_secs(30));
1071
1072        assert!(
1073            matches!(result, HttpError::ServiceClosed),
1074            "Should preserve HttpError::ServiceClosed, got: {result:?}"
1075        );
1076    }
1077
1078    /// Test that `map_tower_error` preserves `HttpError::Timeout` with original duration
1079    #[test]
1080    fn test_map_tower_error_preserves_timeout_attempt() {
1081        let original_duration = Duration::from_secs(5);
1082        let http_err = HttpError::Timeout(original_duration);
1083        let boxed: tower::BoxError = Box::new(http_err);
1084        // Pass a different timeout to verify original is preserved
1085        let result = map_tower_error(boxed, Duration::from_secs(30));
1086
1087        match result {
1088            HttpError::Timeout(d) => {
1089                assert_eq!(
1090                    d, original_duration,
1091                    "Should preserve original timeout duration"
1092                );
1093            }
1094            other => panic!("Should preserve HttpError::Timeout, got: {other:?}"),
1095        }
1096    }
1097
1098    /// Test that `map_tower_error` wraps unknown errors as Transport
1099    #[test]
1100    fn test_map_tower_error_wraps_unknown_as_transport() {
1101        let other_err: tower::BoxError = Box::new(std::io::Error::new(
1102            std::io::ErrorKind::ConnectionRefused,
1103            "connection refused",
1104        ));
1105        let result = map_tower_error(other_err, Duration::from_secs(30));
1106
1107        assert!(
1108            matches!(result, HttpError::Transport(_)),
1109            "Should wrap unknown errors as Transport, got: {result:?}"
1110        );
1111    }
1112
1113    // ==========================================================================
1114    // Cancellation chain test
1115    //
1116    // Proves that dropping the response future from HttpClient cancels the
1117    // inner service future through the toolkit-http middleware stack
1118    // (Buffer → Concurrency → inner service). Retry is disabled to
1119    // isolate the cancellation path.
1120    //
1121    // Uses build_with_inner_service() to inject a fake slow service at the
1122    // bottom of the real tower stack - no HTTP server needed.
1123    // ==========================================================================
1124
1125    /// Dropping the `HttpClient::send()` future must cancel the inner
1126    /// service future through the full middleware stack.
1127    ///
1128    /// Injects a fake service via `build_with_inner_service()` that
1129    /// blocks on a `Notify` (never completes) and signals a second
1130    /// `Notify` from its `Drop` impl. No sleeps - purely notification-based.
1131    #[tokio::test]
1132    async fn test_cancellation_propagates_through_full_stack() {
1133        use crate::response::ResponseBody;
1134        use std::future::Future;
1135        use std::pin::Pin;
1136        use std::sync::Arc;
1137        use std::sync::atomic::{AtomicBool, Ordering};
1138        use std::task::{Context, Poll};
1139        use tower::Service;
1140
1141        #[derive(Clone)]
1142        struct PendingService {
1143            completed: Arc<AtomicBool>,
1144            drop_notifier: Arc<tokio::sync::Notify>,
1145            started_notifier: Arc<tokio::sync::Notify>,
1146        }
1147
1148        struct FutureGuard {
1149            completed: Arc<AtomicBool>,
1150            drop_notifier: Arc<tokio::sync::Notify>,
1151        }
1152
1153        impl Drop for FutureGuard {
1154            fn drop(&mut self) {
1155                if !self.completed.load(Ordering::SeqCst) {
1156                    self.drop_notifier.notify_one();
1157                }
1158            }
1159        }
1160
1161        impl Service<http::Request<Full<Bytes>>> for PendingService {
1162            type Response = http::Response<ResponseBody>;
1163            type Error = HttpError;
1164            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1165
1166            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1167                Poll::Ready(Ok(()))
1168            }
1169
1170            fn call(&mut self, _: http::Request<Full<Bytes>>) -> Self::Future {
1171                let completed = self.completed.clone();
1172                let drop_notifier = self.drop_notifier.clone();
1173                let started_notifier = self.started_notifier.clone();
1174                Box::pin(async move {
1175                    let _guard = FutureGuard {
1176                        completed: completed.clone(),
1177                        drop_notifier,
1178                    };
1179                    // Signal that the request reached the inner service
1180                    started_notifier.notify_one();
1181                    // Block forever - only completes via drop
1182                    std::future::pending::<()>().await;
1183                    completed.store(true, Ordering::SeqCst);
1184                    unreachable!()
1185                })
1186            }
1187        }
1188
1189        let inner_completed = Arc::new(AtomicBool::new(false));
1190        let drop_notifier = Arc::new(tokio::sync::Notify::new());
1191        let started_notifier = Arc::new(tokio::sync::Notify::new());
1192
1193        let inner = PendingService {
1194            completed: inner_completed.clone(),
1195            drop_notifier: drop_notifier.clone(),
1196            started_notifier: started_notifier.clone(),
1197        };
1198
1199        // Build the real HttpClient stack with our fake service at the bottom.
1200        // Retry disabled to isolate cancellation. Tests: Buffer → Concurrency → PendingService
1201        let client = HttpClientBuilder::new()
1202            .timeout(Duration::from_secs(30))
1203            .retry(None)
1204            .build_with_inner_service(inner.boxed_clone());
1205
1206        // Spawn the request so we can drop it explicitly.
1207        // URL uses https:// so the scheme validation in RequestBuilder passes
1208        // under both the non-fips default (AllowInsecureHttp) and the fips
1209        // default (TlsOnly); the connector here is the injected PendingService,
1210        // so no real TLS handshake happens.
1211        let send_handle = tokio::spawn({
1212            let client = client.clone();
1213            async move { client.get("https://fake/slow").send().await }
1214        });
1215
1216        // Wait for the request to reach the inner service
1217        started_notifier.notified().await;
1218
1219        // Drop the in-flight request by aborting the task
1220        send_handle.abort();
1221
1222        // Wait for the drop notification - no sleep, pure notification
1223        tokio::time::timeout(Duration::from_secs(5), drop_notifier.notified())
1224            .await
1225            .expect(
1226                "Inner service future should have been dropped within 5s - \
1227                 the full toolkit-http stack must propagate cancellation",
1228            );
1229
1230        assert!(
1231            !inner_completed.load(Ordering::SeqCst),
1232            "Inner service future should NOT have completed"
1233        );
1234    }
1235}