Skip to main content

toolkit_http/
builder.rs

1use crate::config::{
2    HttpClientConfig, RedirectConfig, RetryConfig, TlsRootConfig, TransportSecurity,
3};
4use crate::error::HttpError;
5use crate::layers::{OtelLayer, RetryLayer, SecureRedirectPolicy, UserAgentLayer};
6use crate::response::ResponseBody;
7use crate::tls;
8use bytes::Bytes;
9use http::Response;
10use http_body_util::{BodyExt, Full};
11use hyper_rustls::HttpsConnector;
12use hyper_util::client::legacy::Client;
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::rt::{TokioExecutor, TokioTimer};
15use std::time::Duration;
16use tower::buffer::Buffer;
17use tower::limit::ConcurrencyLimitLayer;
18use tower::load_shed::LoadShedLayer;
19use tower::timeout::TimeoutLayer;
20use tower::util::BoxCloneService;
21use tower::{ServiceBuilder, ServiceExt};
22use tower_http::decompression::DecompressionLayer;
23use tower_http::follow_redirect::FollowRedirectLayer;
24
25/// Type-erased inner service between layer composition steps in [`HttpClientBuilder::build`].
26type InnerService =
27    BoxCloneService<http::Request<Full<Bytes>>, http::Response<ResponseBody>, HttpError>;
28
29/// Builder for constructing an [`HttpClient`] with a layered tower middleware stack.
30pub struct HttpClientBuilder {
31    config: HttpClientConfig,
32    auth_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
33    metrics_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
34}
35
36impl HttpClientBuilder {
37    /// Create a new builder with default configuration
38    #[must_use]
39    pub fn new() -> Self {
40        Self {
41            config: HttpClientConfig::default(),
42            auth_layer: None,
43            metrics_layer: None,
44        }
45    }
46
47    /// Create a builder with a specific configuration
48    #[must_use]
49    pub fn with_config(config: HttpClientConfig) -> Self {
50        Self {
51            config,
52            auth_layer: None,
53            metrics_layer: None,
54        }
55    }
56
57    /// Set the per-request timeout
58    ///
59    /// This timeout applies to each individual HTTP request/attempt.
60    /// If retries are enabled, each retry attempt gets its own timeout.
61    #[must_use]
62    pub fn timeout(mut self, timeout: Duration) -> Self {
63        self.config.request_timeout = timeout;
64        self
65    }
66
67    /// Set the total timeout spanning all retry attempts
68    ///
69    /// When set, the entire operation (including all retries and backoff delays)
70    /// must complete within this duration. If the deadline is exceeded,
71    /// the request fails with `HttpError::DeadlineExceeded(total_timeout)`.
72    #[must_use]
73    pub fn total_timeout(mut self, timeout: Duration) -> Self {
74        self.config.total_timeout = Some(timeout);
75        self
76    }
77
78    /// Set the user agent string
79    #[must_use]
80    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
81        self.config.user_agent = user_agent.into();
82        self
83    }
84
85    /// Set the retry configuration
86    #[must_use]
87    pub fn retry(mut self, retry: Option<RetryConfig>) -> Self {
88        self.config.retry = retry;
89        self
90    }
91
92    /// Set the maximum response body size
93    #[must_use]
94    pub fn max_body_size(mut self, size: usize) -> Self {
95        self.config.max_body_size = size;
96        self
97    }
98
99    /// Set transport security mode
100    ///
101    /// Use `TransportSecurity::TlsOnly` to enforce HTTPS for all connections.
102    #[must_use]
103    pub fn transport(mut self, transport: TransportSecurity) -> Self {
104        self.config.transport = transport;
105        self
106    }
107
108    /// Deny insecure HTTP connections, enforcing TLS for all traffic
109    ///
110    /// Equivalent to `.transport(TransportSecurity::TlsOnly)`.
111    ///
112    /// Use this when TLS enforcement is required (e.g., production environments).
113    #[must_use]
114    pub fn deny_insecure_http(mut self) -> Self {
115        tracing::debug!(
116            target: "toolkit_http::security",
117            "deny_insecure_http() called - enforcing TLS for all connections"
118        );
119        self.config.transport = TransportSecurity::TlsOnly;
120        self
121    }
122
123    /// Enable OpenTelemetry tracing layer
124    ///
125    /// When enabled, creates spans for outbound requests with HTTP metadata
126    /// and injects W3C trace context headers (when `otel` feature is enabled).
127    #[must_use]
128    pub fn with_otel(mut self) -> Self {
129        self.config.otel = true;
130        self
131    }
132
133    /// Insert an optional auth layer between retry and timeout in the stack.
134    ///
135    /// Stack position: `… → Retry → **this layer** → Timeout → …`
136    ///
137    /// The layer sits inside the retry loop so each attempt re-executes it
138    /// (e.g. re-reads a refreshed bearer token). Only one auth layer can be
139    /// set; a second call replaces the first.
140    #[must_use]
141    pub fn with_auth_layer(
142        mut self,
143        wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
144    ) -> Self {
145        self.auth_layer = Some(Box::new(wrap));
146        self
147    }
148
149    /// Insert a metrics layer between the rate-limit and retry layers.
150    ///
151    /// Stack position: `… → RateLimit → **this layer** → Retry → Auth → Timeout → …`
152    ///
153    /// The layer sits outside the retry loop so it observes a single logical
154    /// request once, regardless of how many transport-level retries the retry
155    /// layer issues. If the use case is "count every attempt", the equivalent
156    /// observation can be made with [`with_otel`](Self::with_otel) (one span
157    /// per attempt) and a `tracing` → metrics bridge.
158    ///
159    /// Only one metrics layer can be set; a second call replaces the first.
160    #[must_use]
161    pub fn with_metrics_layer(
162        mut self,
163        wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
164    ) -> Self {
165        self.metrics_layer = Some(Box::new(wrap));
166        self
167    }
168
169    /// Set the buffer capacity for concurrent request handling
170    ///
171    /// The HTTP client uses an internal buffer to allow concurrent requests
172    /// without external locking. This sets the maximum number of requests
173    /// that can be queued.
174    ///
175    /// **Note**: A capacity of 0 is invalid and will be clamped to 1.
176    /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
177    #[must_use]
178    pub fn buffer_capacity(mut self, capacity: usize) -> Self {
179        // Clamp to at least 1 - tower::Buffer panics with capacity=0
180        self.config.buffer_capacity = capacity.max(1);
181        self
182    }
183
184    /// Set the maximum number of redirects to follow
185    ///
186    /// Set to `0` to disable redirect following (3xx responses pass through as-is).
187    /// Default: 10
188    #[must_use]
189    pub fn max_redirects(mut self, max_redirects: usize) -> Self {
190        self.config.redirect.max_redirects = max_redirects;
191        self
192    }
193
194    /// Disable redirect following
195    ///
196    /// Equivalent to `.max_redirects(0)`. When disabled, 3xx responses are
197    /// returned to the caller without following the `Location` header.
198    #[must_use]
199    pub fn no_redirects(mut self) -> Self {
200        self.config.redirect = RedirectConfig::disabled();
201        self
202    }
203
204    /// Set the redirect policy configuration
205    ///
206    /// Use this to configure redirect security settings:
207    /// - `same_origin_only`: Only follow redirects to the same host
208    /// - `strip_sensitive_headers`: Remove `Authorization`/`Cookie` on cross-origin
209    /// - `allow_https_downgrade`: Allow HTTPS → HTTP redirects (not recommended)
210    ///
211    /// # Example
212    ///
213    /// ```rust,ignore
214    /// let client = HttpClient::builder()
215    ///     .redirect(RedirectConfig::permissive()) // Allow all redirects with header stripping
216    ///     .build()?;
217    /// ```
218    #[must_use]
219    pub fn redirect(mut self, config: RedirectConfig) -> Self {
220        self.config.redirect = config;
221        self
222    }
223
224    /// Set the idle connection timeout for the connection pool
225    ///
226    /// Connections that remain idle for longer than this duration will be
227    /// closed and removed from the pool. Default: 90 seconds.
228    ///
229    /// Set to `None` to disable idle timeout (connections kept indefinitely).
230    #[must_use]
231    pub fn pool_idle_timeout(mut self, timeout: Option<Duration>) -> Self {
232        self.config.pool_idle_timeout = timeout;
233        self
234    }
235
236    /// Set the maximum number of idle connections per host
237    ///
238    /// Limits how many idle connections are kept in the pool for each host.
239    /// Default: 32.
240    ///
241    /// - Setting to `0` disables connection reuse entirely
242    /// - Setting too high may waste resources on rarely-used connections
243    #[must_use]
244    pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
245        self.config.pool_max_idle_per_host = max;
246        self
247    }
248
249    /// Build the HTTP client with all configured layers
250    ///
251    /// # Errors
252    /// Returns an error if TLS initialization fails or configuration is invalid.
253    ///
254    /// Under `--features fips`, returns [`HttpError::InsecureTransport`] when
255    /// `config.transport == TransportSecurity::AllowInsecureHttp`. FIPS builds
256    /// must use [`TransportSecurity::TlsOnly`] — there is no opt-out.
257    pub fn build(self) -> Result<crate::HttpClient, HttpError> {
258        // Reject AllowInsecureHttp under --features fips before any TLS work.
259        // The check lives here (rather than in HttpClientConfig) so that
260        // constructing a config with AllowInsecureHttp is still cheap and
261        // infallible; only actually building a client fails closed.
262        #[cfg(feature = "fips")]
263        if self.config.transport == TransportSecurity::AllowInsecureHttp {
264            tracing::warn!(
265                target: "toolkit_http::security",
266                "rejecting AllowInsecureHttp under --features fips: returning HttpError::InsecureTransport"
267            );
268            return Err(HttpError::InsecureTransport);
269        }
270
271        let timeout = self.config.request_timeout;
272        let total_timeout = self.config.total_timeout;
273
274        // Build the HTTPS connector (may fail for Native roots if no valid certs)
275        let https = build_https_connector(self.config.tls_roots, self.config.transport)?;
276
277        // Create the base hyper client with HTTP/2 support and connection pool settings
278        let mut client_builder = Client::builder(TokioExecutor::new());
279
280        // Configure connection pool
281        // CRITICAL: pool_timer is required for pool_idle_timeout to work!
282        client_builder
283            .pool_timer(TokioTimer::new())
284            .pool_max_idle_per_host(self.config.pool_max_idle_per_host)
285            .http2_only(false); // Allow both HTTP/1 and HTTP/2 via ALPN
286
287        // Set idle timeout (None = no timeout, connections kept indefinitely)
288        if let Some(idle_timeout) = self.config.pool_idle_timeout {
289            client_builder.pool_idle_timeout(idle_timeout);
290        }
291
292        let hyper_client = client_builder.build::<_, Full<Bytes>>(https);
293
294        // Parse user agent header (may fail)
295        let ua_layer = UserAgentLayer::try_new(&self.config.user_agent)?;
296
297        // =======================================================================
298        // Tower Layer Stack (outer to inner)
299        // =======================================================================
300        //
301        // Request flow (outer → inner):
302        //   Buffer → OtelLayer → LoadShed/Concurrency → [MetricsLayer?] →
303        //   RetryLayer → [AuthLayer?] → ErrorMapping → Timeout → UserAgent →
304        //   Decompression → FollowRedirect → hyper_client
305        //
306        // AuthLayer (if set via with_auth_layer) sits inside the retry
307        // loop so each retry re-acquires credentials (e.g. refreshed
308        // bearer token).
309        //
310        // MetricsLayer (if set via with_metrics_layer) sits outside the
311        // retry loop so it observes one logical request, not per-attempt.
312        //
313        // Response flow (inner → outer):
314        //   hyper_client → FollowRedirect → Decompression → UserAgent →
315        //   Timeout → ErrorMapping → [AuthLayer?] → RetryLayer →
316        //   [MetricsLayer?] → LoadShed/Concurrency → OtelLayer → Buffer
317        //
318        // Key semantics (reqwest-like):
319        //  - send() returns Ok(Response) for ALL HTTP statuses (including 4xx/5xx)
320        //  - send() returns Err only for transport/timeout/TLS errors
321        //  - Non-2xx converted to error ONLY via error_for_status()
322        //  - RetryLayer handles both Err (transport) and Ok(Response) (status)
323        //     retries internally, draining body before retry for connection reuse
324        //  - FollowRedirect handles 3xx responses internally with security protections:
325        //     * Same-origin enforcement (default) - blocks SSRF attacks
326        //     * Sensitive header stripping on cross-origin redirects
327        //     * HTTPS downgrade protection
328        //
329        // =======================================================================
330        //
331        let redirect_policy = SecureRedirectPolicy::new(self.config.redirect.clone());
332
333        // Build the service stack with secure redirect following
334        let service = ServiceBuilder::new()
335            .layer(TimeoutLayer::new(timeout))
336            .layer(ua_layer)
337            .layer(DecompressionLayer::new())
338            .layer(FollowRedirectLayer::with_policy(redirect_policy))
339            .service(hyper_client);
340
341        // Map the decompression body to our boxed ResponseBody type.
342        // This converts Response<DecompressionBody<Incoming>> to Response<ResponseBody>.
343        //
344        // The decompression body's error type is tower_http::BoxError, which we convert
345        // to our boxed error type for consistency with the ResponseBody definition.
346        let service = service.map_response(map_decompression_response);
347
348        // Map errors to HttpError with proper timeout duration
349        let service = service.map_err(move |e: tower::BoxError| map_tower_error(e, timeout));
350
351        // Box the service for type erasure
352        let mut boxed_service = service.boxed_clone();
353
354        // Apply auth layer (between timeout and retry).
355        // Inside retry so each retry attempt re-acquires the token.
356        if let Some(wrap) = self.auth_layer {
357            boxed_service = wrap(boxed_service);
358        }
359
360        // Conditionally wrap with RetryLayer
361        //
362        // RetryLayer handles retries for both:
363        // - Err(HttpError::Transport/Timeout) - transport-level failures
364        // - Ok(Response) with retryable status codes (429, 5xx for GET, etc.)
365        //
366        // When retrying on status codes, RetryLayer drains the response body
367        // (up to configured limit) to allow connection reuse.
368        //
369        // If total_timeout is set, the entire operation (including all retries)
370        // must complete within that duration.
371        if let Some(ref retry_config) = self.config.retry {
372            let retry_layer = RetryLayer::with_total_timeout(retry_config.clone(), total_timeout);
373            let retry_service = ServiceBuilder::new()
374                .layer(retry_layer)
375                .service(boxed_service);
376            boxed_service = retry_service.boxed_clone();
377        }
378
379        // Apply metrics layer (between retry and rate-limit).
380        // Outside the retry loop: observes one logical request, not per-attempt.
381        if let Some(wrap) = self.metrics_layer {
382            boxed_service = wrap(boxed_service);
383        }
384
385        // Conditionally wrap with concurrency limit + load shedding
386        // LoadShedLayer returns error immediately when ConcurrencyLimitLayer is saturated
387        // instead of waiting indefinitely (Poll::Pending)
388        if let Some(rate_limit) = self.config.rate_limit
389            && rate_limit.max_concurrent_requests < usize::MAX
390        {
391            let limited_service = ServiceBuilder::new()
392                .layer(LoadShedLayer::new())
393                .layer(ConcurrencyLimitLayer::new(
394                    rate_limit.max_concurrent_requests,
395                ))
396                .service(boxed_service);
397            // Map load shed errors to HttpError::Overloaded
398            let limited_service = limited_service.map_err(map_load_shed_error);
399            boxed_service = limited_service.boxed_clone();
400        }
401
402        // Conditionally wrap with OTEL tracing layer (outermost layer before buffer)
403        // Applied last so it sees the final request after UserAgent and other modifications.
404        // Creates spans, records status, and injects trace context headers.
405        if self.config.otel {
406            let otel_service = ServiceBuilder::new()
407                .layer(OtelLayer::new())
408                .service(boxed_service);
409            boxed_service = otel_service.boxed_clone();
410        }
411
412        // Wrap in Buffer as the final step for true concurrent access
413        // Buffer spawns a background task that processes requests from a channel,
414        // providing Clone + Send + Sync without any mutex serialization.
415        let buffer_capacity = self.config.buffer_capacity.max(1);
416        let buffered_service: crate::client::BufferedService =
417            Buffer::new(boxed_service, buffer_capacity);
418
419        Ok(crate::HttpClient {
420            service: buffered_service,
421            max_body_size: self.config.max_body_size,
422            transport_security: self.config.transport,
423        })
424    }
425}
426
427#[cfg(test)]
428impl HttpClientBuilder {
429    /// Build an `HttpClient` with a custom inner service replacing the
430    /// hyper connector. The full middleware stack (Retry, Concurrency,
431    /// Buffer, etc.) is applied on top.
432    ///
433    /// The inner service must handle `Request<Full<Bytes>>` and return
434    /// `Response<ResponseBody>`. Use this to inject a fake slow service
435    /// for cancellation testing without needing a real HTTP server.
436    fn build_with_inner_service(self, inner: InnerService) -> crate::HttpClient {
437        let mut boxed_service = inner;
438
439        if let Some(ref retry_config) = self.config.retry {
440            let retry_layer =
441                RetryLayer::with_total_timeout(retry_config.clone(), self.config.total_timeout);
442            let retry_service = ServiceBuilder::new()
443                .layer(retry_layer)
444                .service(boxed_service);
445            boxed_service = retry_service.boxed_clone();
446        }
447
448        if let Some(rate_limit) = self.config.rate_limit
449            && rate_limit.max_concurrent_requests < usize::MAX
450        {
451            let limited_service = ServiceBuilder::new()
452                .layer(LoadShedLayer::new())
453                .layer(ConcurrencyLimitLayer::new(
454                    rate_limit.max_concurrent_requests,
455                ))
456                .service(boxed_service);
457            let limited_service = limited_service.map_err(map_load_shed_error);
458            boxed_service = limited_service.boxed_clone();
459        }
460
461        let buffer_capacity = self.config.buffer_capacity.max(1);
462        let buffered_service: crate::client::BufferedService =
463            Buffer::new(boxed_service, buffer_capacity);
464
465        crate::HttpClient {
466            service: buffered_service,
467            max_body_size: self.config.max_body_size,
468            transport_security: self.config.transport,
469        }
470    }
471}
472
473impl Default for HttpClientBuilder {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479/// Map tower errors to `HttpError` with actual timeout duration
480///
481/// Attempts to extract existing `HttpError` from the boxed error before
482/// wrapping as `Transport`. This preserves typed errors like `Overloaded`
483/// and `ServiceClosed` that may have been boxed by tower middleware.
484fn map_tower_error(err: tower::BoxError, timeout: Duration) -> HttpError {
485    if err.is::<tower::timeout::error::Elapsed>() {
486        return HttpError::Timeout(timeout);
487    }
488
489    // Try to extract existing HttpError before wrapping as Transport
490    match err.downcast::<HttpError>() {
491        Ok(http_err) => *http_err,
492        Err(other) => HttpError::Transport(other),
493    }
494}
495
496/// Map load shed errors to `HttpError::Overloaded`
497fn map_load_shed_error(err: tower::BoxError) -> HttpError {
498    if err.is::<tower::load_shed::error::Overloaded>() {
499        HttpError::Overloaded
500    } else {
501        // Pass through other HttpError types (from inner service)
502        match err.downcast::<HttpError>() {
503            Ok(http_err) => *http_err,
504            Err(err) => HttpError::Transport(err),
505        }
506    }
507}
508
509/// Map the decompression response to our boxed response body type.
510///
511/// This converts `Response<DecompressionBody<Incoming>>` to `Response<ResponseBody>`
512/// by boxing the body with appropriate error type mapping.
513fn map_decompression_response<B>(response: Response<B>) -> Response<ResponseBody>
514where
515    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
516    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
517{
518    let (parts, body) = response.into_parts();
519    // Convert the decompression body errors to our boxed error type.
520    // tower-http's DecompressionBody uses tower_http::BoxError which is
521    // compatible with our Box<dyn Error + Send + Sync> via Into.
522    let boxed_body: ResponseBody = body.map_err(Into::into).boxed();
523    Response::from_parts(parts, boxed_body)
524}
525
526/// Build the HTTPS connector with the specified TLS root configuration.
527///
528/// For `TlsRootConfig::Native`, uses cached native root certificates to avoid
529/// repeated OS certificate store lookups on each `build()` call.
530///
531/// HTTP/2 is enabled via `enable_all_versions()` which configures ALPN to
532/// advertise both h2 and http/1.1. Protocol selection happens during TLS
533/// handshake based on server support.
534///
535/// # Errors
536///
537/// Returns `HttpError::Tls` if `TlsRootConfig::Native` is requested but no
538/// valid root certificates are available from the OS certificate store.
539fn build_https_connector(
540    tls_roots: TlsRootConfig,
541    transport: TransportSecurity,
542) -> Result<HttpsConnector<HttpConnector>, HttpError> {
543    let allow_http = transport == TransportSecurity::AllowInsecureHttp;
544
545    // Both branches build a `ClientConfig` ourselves (rather than using
546    // `with_provider_and_webpki_roots`) so we can apply `require_ems = true`
547    // under the `fips` feature — see `tls::build_client_config`. The
548    // functions return `tls::TlsConfigError` (an enum implementing
549    // `std::error::Error`); boxing it into `HttpError::Tls`'s
550    // `Box<dyn Error + Send + Sync>` preserves the source chain via
551    // `TlsConfigError`'s own `Error::source()` impl.
552    let client_config = match tls_roots {
553        TlsRootConfig::WebPki => tls::webpki_roots_client_config(),
554        TlsRootConfig::Native => tls::native_roots_client_config(),
555    }
556    .map_err(|e| HttpError::Tls(Box::new(e)))?;
557
558    let builder = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(client_config);
559    let connector = if allow_http {
560        builder.https_or_http().enable_all_versions().build()
561    } else {
562        builder.https_only().enable_all_versions().build()
563    };
564    Ok(connector)
565}
566
567#[cfg(test)]
568#[cfg_attr(coverage_nightly, coverage(off))]
569mod tests {
570    use super::*;
571    use crate::config::DEFAULT_USER_AGENT;
572
573    #[test]
574    fn test_builder_default() {
575        let builder = HttpClientBuilder::new();
576        assert_eq!(builder.config.request_timeout, Duration::from_secs(30));
577        assert_eq!(builder.config.user_agent, DEFAULT_USER_AGENT);
578        assert!(builder.config.retry.is_some());
579        assert_eq!(builder.config.buffer_capacity, 1024);
580    }
581
582    #[test]
583    fn test_builder_with_config() {
584        let config = HttpClientConfig::minimal();
585        let builder = HttpClientBuilder::with_config(config);
586        assert_eq!(builder.config.request_timeout, Duration::from_secs(10));
587    }
588
589    #[test]
590    fn test_builder_timeout() {
591        let builder = HttpClientBuilder::new().timeout(Duration::from_mins(1));
592        assert_eq!(builder.config.request_timeout, Duration::from_mins(1));
593    }
594
595    #[test]
596    fn test_builder_user_agent() {
597        let builder = HttpClientBuilder::new().user_agent("custom/1.0");
598        assert_eq!(builder.config.user_agent, "custom/1.0");
599    }
600
601    #[test]
602    fn test_builder_retry() {
603        let builder = HttpClientBuilder::new().retry(None);
604        assert!(builder.config.retry.is_none());
605    }
606
607    #[test]
608    fn test_builder_max_body_size() {
609        let builder = HttpClientBuilder::new().max_body_size(1024);
610        assert_eq!(builder.config.max_body_size, 1024);
611    }
612
613    #[test]
614    fn test_builder_transport_security() {
615        let builder = HttpClientBuilder::new().transport(TransportSecurity::TlsOnly);
616        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
617
618        let builder = HttpClientBuilder::new().deny_insecure_http();
619        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
620
621        let builder = HttpClientBuilder::new();
622        #[cfg(not(feature = "fips"))]
623        assert_eq!(
624            builder.config.transport,
625            TransportSecurity::AllowInsecureHttp
626        );
627        #[cfg(feature = "fips")]
628        assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
629    }
630
631    #[test]
632    fn test_builder_otel() {
633        let builder = HttpClientBuilder::new().with_otel();
634        assert!(builder.config.otel);
635    }
636
637    #[test]
638    fn test_builder_buffer_capacity() {
639        let builder = HttpClientBuilder::new().buffer_capacity(512);
640        assert_eq!(builder.config.buffer_capacity, 512);
641    }
642
643    /// Test that `buffer_capacity=0` is clamped to 1 to prevent panic.
644    ///
645    /// Tower's Buffer panics with capacity=0, so we enforce minimum of 1.
646    #[test]
647    fn test_builder_buffer_capacity_zero_clamped() {
648        let builder = HttpClientBuilder::new().buffer_capacity(0);
649        assert_eq!(
650            builder.config.buffer_capacity, 1,
651            "buffer_capacity=0 should be clamped to 1"
652        );
653    }
654
655    /// Test that `buffer_capacity=0` via config is clamped during `build()`.
656    #[tokio::test]
657    async fn test_builder_buffer_capacity_zero_in_config_clamped() {
658        let config = HttpClientConfig {
659            buffer_capacity: 0, // Invalid - should be clamped in build()
660            ..Default::default()
661        };
662        let result = HttpClientBuilder::with_config(config).build();
663        // Should succeed (clamped to 1), not panic
664        assert!(
665            result.is_ok(),
666            "build() should succeed with capacity clamped to 1"
667        );
668    }
669
670    #[tokio::test]
671    async fn test_builder_build_with_otel() {
672        let client = HttpClientBuilder::new().with_otel().build();
673        assert!(client.is_ok());
674    }
675
676    #[tokio::test]
677    async fn test_builder_with_auth_layer() {
678        let client = HttpClientBuilder::new()
679            .with_auth_layer(|svc| svc) // identity transform
680            .build();
681        assert!(client.is_ok());
682    }
683
684    #[tokio::test]
685    async fn test_builder_with_metrics_layer() {
686        let client = HttpClientBuilder::new()
687            .with_metrics_layer(|svc| svc) // identity transform
688            .build();
689        assert!(client.is_ok());
690    }
691
692    #[tokio::test]
693    async fn test_builder_with_metrics_layer_second_call_replaces_first() {
694        use std::sync::Arc;
695        use std::sync::atomic::{AtomicUsize, Ordering};
696
697        let call_count = Arc::new(AtomicUsize::new(0));
698        let call_count2 = call_count.clone();
699
700        // Second call should replace the first; only one layer is applied.
701        let client = HttpClientBuilder::new()
702            .with_metrics_layer(|_svc| {
703                // This closure should NOT be called (replaced by the second).
704                panic!("first metrics layer should have been replaced");
705            })
706            .with_metrics_layer(move |svc| {
707                call_count2.fetch_add(1, Ordering::SeqCst);
708                svc
709            })
710            .build();
711
712        assert!(client.is_ok());
713        assert_eq!(
714            call_count.load(Ordering::SeqCst),
715            1,
716            "second metrics layer must be applied exactly once"
717        );
718    }
719
720    #[tokio::test]
721    async fn test_builder_build() {
722        let client = HttpClientBuilder::new().build();
723        assert!(client.is_ok());
724    }
725
726    #[tokio::test]
727    async fn test_builder_build_with_deny_insecure_http() {
728        let client = HttpClientBuilder::new().deny_insecure_http().build();
729        assert!(client.is_ok());
730    }
731
732    #[tokio::test]
733    async fn test_builder_build_with_sse_config() {
734        use crate::config::HttpClientConfig;
735        let config = HttpClientConfig::sse();
736        let client = HttpClientBuilder::with_config(config).build();
737        assert!(client.is_ok(), "SSE config should build successfully");
738    }
739
740    #[tokio::test]
741    async fn test_builder_build_invalid_user_agent() {
742        let client = HttpClientBuilder::new()
743            .user_agent("invalid\x00agent")
744            .build();
745        assert!(client.is_err());
746    }
747
748    #[tokio::test]
749    async fn test_builder_default_uses_webpki_roots() {
750        let builder = HttpClientBuilder::new();
751        assert_eq!(builder.config.tls_roots, TlsRootConfig::WebPki);
752        // Build should succeed without OS native roots
753        let client = builder.build();
754        assert!(client.is_ok());
755    }
756
757    #[tokio::test]
758    async fn test_builder_native_roots() {
759        let config = HttpClientConfig {
760            tls_roots: TlsRootConfig::Native,
761            ..Default::default()
762        };
763        let result = HttpClientBuilder::with_config(config).build();
764
765        // Native roots may succeed or fail depending on OS certificate availability.
766        // On systems with certs: Ok(_)
767        // On minimal containers without certs: Err(HttpError::Tls(_))
768        match &result {
769            Ok(_) => {
770                // Success on systems with native certs
771            }
772            Err(HttpError::Tls(err)) => {
773                // Expected failure on systems without native certs
774                let msg = err.to_string();
775                assert!(
776                    msg.contains("native root") || msg.contains("certificate"),
777                    "TLS error should mention certificates: {msg}"
778                );
779            }
780            Err(other) => {
781                panic!("Unexpected error type: {other:?}");
782            }
783        }
784    }
785
786    #[tokio::test]
787    async fn test_builder_webpki_roots_https_only() {
788        let config = HttpClientConfig {
789            tls_roots: TlsRootConfig::WebPki,
790            transport: TransportSecurity::TlsOnly,
791            ..Default::default()
792        };
793        let client = HttpClientBuilder::with_config(config).build();
794        assert!(client.is_ok());
795    }
796
797    /// Verify HTTP/2 is enabled for all TLS root configurations.
798    ///
799    /// HTTP/2 support is configured via `enable_all_versions()` on the connector,
800    /// which sets up ALPN to negotiate h2 or http/1.1 during TLS handshake.
801    /// The hyper client uses `http2_only(false)` to allow both protocols.
802    ///
803    /// The `AllowInsecureHttp` sub-cases are skipped under `--features fips`
804    /// because the FIPS guard in `build()` rejects insecure transport.
805    #[tokio::test]
806    async fn test_http2_enabled_for_all_configurations() {
807        // Test WebPki with default transport (AllowInsecureHttp without fips, TlsOnly under fips)
808        let client = HttpClientBuilder::new().build();
809        assert!(
810            client.is_ok(),
811            "WebPki + default transport should build with HTTP/2 enabled"
812        );
813
814        // Test WebPki with TlsOnly (HTTPS only)
815        let client = HttpClientBuilder::new()
816            .transport(TransportSecurity::TlsOnly)
817            .build();
818        assert!(
819            client.is_ok(),
820            "WebPki + TlsOnly should build with HTTP/2 enabled"
821        );
822
823        // Test Native roots with AllowInsecureHttp (non-fips only)
824        #[cfg(not(feature = "fips"))]
825        {
826            let config = HttpClientConfig {
827                tls_roots: TlsRootConfig::Native,
828                transport: TransportSecurity::AllowInsecureHttp,
829                ..Default::default()
830            };
831            let client = HttpClientBuilder::with_config(config).build();
832            assert!(
833                client.is_ok(),
834                "Native + AllowInsecureHttp should build with HTTP/2 enabled"
835            );
836        }
837
838        // Test Native roots with TlsOnly (HTTPS only)
839        let config = HttpClientConfig {
840            tls_roots: TlsRootConfig::Native,
841            transport: TransportSecurity::TlsOnly,
842            ..Default::default()
843        };
844        let client = HttpClientBuilder::with_config(config).build();
845        assert!(
846            client.is_ok(),
847            "Native + TlsOnly should build with HTTP/2 enabled"
848        );
849    }
850
851    /// Test that concurrency limit uses fail-fast behavior (C2).
852    ///
853    /// `LoadShedLayer` + `ConcurrencyLimitLayer` combination returns Overloaded error
854    /// immediately when capacity is exhausted, instead of blocking indefinitely.
855    #[tokio::test]
856    async fn test_load_shedding_returns_overloaded_error() {
857        use bytes::Bytes;
858        use http::{Request, Response};
859        use http_body_util::Full;
860        use std::future::Future;
861        use std::pin::Pin;
862        use std::sync::Arc;
863        use std::sync::atomic::{AtomicUsize, Ordering};
864        use std::task::{Context, Poll};
865        use tower::Service;
866        use tower::ServiceExt;
867
868        // A service that holds a slot forever once called
869        #[derive(Clone)]
870        struct SlotHoldingService {
871            active: Arc<AtomicUsize>,
872        }
873
874        impl Service<Request<Full<Bytes>>> for SlotHoldingService {
875            type Response = Response<Full<Bytes>>;
876            type Error = HttpError;
877            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
878
879            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
880                Poll::Ready(Ok(()))
881            }
882
883            fn call(&mut self, _: Request<Full<Bytes>>) -> Self::Future {
884                self.active.fetch_add(1, Ordering::SeqCst);
885                // Never complete - holds the slot
886                Box::pin(std::future::pending())
887            }
888        }
889
890        let active = Arc::new(AtomicUsize::new(0));
891
892        // Build a service with load shedding and concurrency limit of 1
893        let service = tower::ServiceBuilder::new()
894            .layer(LoadShedLayer::new())
895            .layer(ConcurrencyLimitLayer::new(1))
896            .service(SlotHoldingService {
897                active: active.clone(),
898            });
899
900        let service = service.map_err(map_load_shed_error);
901
902        // First request: will occupy the single slot
903        let req1 = Request::builder()
904            .uri("http://test")
905            .body(Full::new(Bytes::new()))
906            .unwrap();
907        let mut svc1 = service.clone();
908
909        let svc1_ready = svc1.ready().await.unwrap();
910        let _pending_fut = svc1_ready.call(req1);
911
912        // Wait for the slot to be occupied
913        tokio::time::sleep(Duration::from_millis(10)).await;
914        assert_eq!(
915            active.load(Ordering::SeqCst),
916            1,
917            "First request should be active"
918        );
919
920        // Second request: LoadShedLayer should reject because ConcurrencyLimit is at capacity
921        let req2 = Request::builder()
922            .uri("http://test")
923            .body(Full::new(Bytes::new()))
924            .unwrap();
925
926        let mut svc2 = service.clone();
927
928        // LoadShedLayer checks poll_ready and returns Overloaded if inner service is not ready
929        let result = tokio::time::timeout(Duration::from_millis(100), async {
930            // poll_ready should return quickly with error (not block)
931            match svc2.ready().await {
932                Ok(ready_svc) => ready_svc.call(req2).await,
933                Err(e) => Err(e),
934            }
935        })
936        .await;
937
938        // Should complete within timeout (not hang) and return Overloaded
939        assert!(result.is_ok(), "Request should not hang");
940        let err = result.unwrap().unwrap_err();
941        assert!(
942            matches!(err, HttpError::Overloaded),
943            "Expected Overloaded error, got: {err:?}"
944        );
945    }
946
947    // ==========================================================================
948    // map_tower_error Tests
949    // ==========================================================================
950
951    /// Test that `map_tower_error` preserves `HttpError::Overloaded` when wrapped in `BoxError`
952    #[test]
953    fn test_map_tower_error_preserves_overloaded() {
954        let http_err = HttpError::Overloaded;
955        let boxed: tower::BoxError = Box::new(http_err);
956        let result = map_tower_error(boxed, Duration::from_secs(30));
957
958        assert!(
959            matches!(result, HttpError::Overloaded),
960            "Should preserve HttpError::Overloaded, got: {result:?}"
961        );
962    }
963
964    /// Test that `map_tower_error` preserves `HttpError::ServiceClosed` when wrapped in `BoxError`
965    #[test]
966    fn test_map_tower_error_preserves_service_closed() {
967        let http_err = HttpError::ServiceClosed;
968        let boxed: tower::BoxError = Box::new(http_err);
969        let result = map_tower_error(boxed, Duration::from_secs(30));
970
971        assert!(
972            matches!(result, HttpError::ServiceClosed),
973            "Should preserve HttpError::ServiceClosed, got: {result:?}"
974        );
975    }
976
977    /// Test that `map_tower_error` preserves `HttpError::Timeout` with original duration
978    #[test]
979    fn test_map_tower_error_preserves_timeout_attempt() {
980        let original_duration = Duration::from_secs(5);
981        let http_err = HttpError::Timeout(original_duration);
982        let boxed: tower::BoxError = Box::new(http_err);
983        // Pass a different timeout to verify original is preserved
984        let result = map_tower_error(boxed, Duration::from_secs(30));
985
986        match result {
987            HttpError::Timeout(d) => {
988                assert_eq!(
989                    d, original_duration,
990                    "Should preserve original timeout duration"
991                );
992            }
993            other => panic!("Should preserve HttpError::Timeout, got: {other:?}"),
994        }
995    }
996
997    /// Test that `map_tower_error` wraps unknown errors as Transport
998    #[test]
999    fn test_map_tower_error_wraps_unknown_as_transport() {
1000        let other_err: tower::BoxError = Box::new(std::io::Error::new(
1001            std::io::ErrorKind::ConnectionRefused,
1002            "connection refused",
1003        ));
1004        let result = map_tower_error(other_err, Duration::from_secs(30));
1005
1006        assert!(
1007            matches!(result, HttpError::Transport(_)),
1008            "Should wrap unknown errors as Transport, got: {result:?}"
1009        );
1010    }
1011
1012    // ==========================================================================
1013    // Cancellation chain test
1014    //
1015    // Proves that dropping the response future from HttpClient cancels the
1016    // inner service future through the toolkit-http middleware stack
1017    // (Buffer → Concurrency → inner service). Retry is disabled to
1018    // isolate the cancellation path.
1019    //
1020    // Uses build_with_inner_service() to inject a fake slow service at the
1021    // bottom of the real tower stack - no HTTP server needed.
1022    // ==========================================================================
1023
1024    /// Dropping the `HttpClient::send()` future must cancel the inner
1025    /// service future through the full middleware stack.
1026    ///
1027    /// Injects a fake service via `build_with_inner_service()` that
1028    /// blocks on a `Notify` (never completes) and signals a second
1029    /// `Notify` from its `Drop` impl. No sleeps - purely notification-based.
1030    #[tokio::test]
1031    async fn test_cancellation_propagates_through_full_stack() {
1032        use crate::response::ResponseBody;
1033        use std::future::Future;
1034        use std::pin::Pin;
1035        use std::sync::Arc;
1036        use std::sync::atomic::{AtomicBool, Ordering};
1037        use std::task::{Context, Poll};
1038        use tower::Service;
1039
1040        #[derive(Clone)]
1041        struct PendingService {
1042            completed: Arc<AtomicBool>,
1043            drop_notifier: Arc<tokio::sync::Notify>,
1044            started_notifier: Arc<tokio::sync::Notify>,
1045        }
1046
1047        struct FutureGuard {
1048            completed: Arc<AtomicBool>,
1049            drop_notifier: Arc<tokio::sync::Notify>,
1050        }
1051
1052        impl Drop for FutureGuard {
1053            fn drop(&mut self) {
1054                if !self.completed.load(Ordering::SeqCst) {
1055                    self.drop_notifier.notify_one();
1056                }
1057            }
1058        }
1059
1060        impl Service<http::Request<Full<Bytes>>> for PendingService {
1061            type Response = http::Response<ResponseBody>;
1062            type Error = HttpError;
1063            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1064
1065            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1066                Poll::Ready(Ok(()))
1067            }
1068
1069            fn call(&mut self, _: http::Request<Full<Bytes>>) -> Self::Future {
1070                let completed = self.completed.clone();
1071                let drop_notifier = self.drop_notifier.clone();
1072                let started_notifier = self.started_notifier.clone();
1073                Box::pin(async move {
1074                    let _guard = FutureGuard {
1075                        completed: completed.clone(),
1076                        drop_notifier,
1077                    };
1078                    // Signal that the request reached the inner service
1079                    started_notifier.notify_one();
1080                    // Block forever - only completes via drop
1081                    std::future::pending::<()>().await;
1082                    completed.store(true, Ordering::SeqCst);
1083                    unreachable!()
1084                })
1085            }
1086        }
1087
1088        let inner_completed = Arc::new(AtomicBool::new(false));
1089        let drop_notifier = Arc::new(tokio::sync::Notify::new());
1090        let started_notifier = Arc::new(tokio::sync::Notify::new());
1091
1092        let inner = PendingService {
1093            completed: inner_completed.clone(),
1094            drop_notifier: drop_notifier.clone(),
1095            started_notifier: started_notifier.clone(),
1096        };
1097
1098        // Build the real HttpClient stack with our fake service at the bottom.
1099        // Retry disabled to isolate cancellation. Tests: Buffer → Concurrency → PendingService
1100        let client = HttpClientBuilder::new()
1101            .timeout(Duration::from_secs(30))
1102            .retry(None)
1103            .build_with_inner_service(inner.boxed_clone());
1104
1105        // Spawn the request so we can drop it explicitly.
1106        // URL uses https:// so the scheme validation in RequestBuilder passes
1107        // under both the non-fips default (AllowInsecureHttp) and the fips
1108        // default (TlsOnly); the connector here is the injected PendingService,
1109        // so no real TLS handshake happens.
1110        let send_handle = tokio::spawn({
1111            let client = client.clone();
1112            async move { client.get("https://fake/slow").send().await }
1113        });
1114
1115        // Wait for the request to reach the inner service
1116        started_notifier.notified().await;
1117
1118        // Drop the in-flight request by aborting the task
1119        send_handle.abort();
1120
1121        // Wait for the drop notification - no sleep, pure notification
1122        tokio::time::timeout(Duration::from_secs(5), drop_notifier.notified())
1123            .await
1124            .expect(
1125                "Inner service future should have been dropped within 5s - \
1126                 the full toolkit-http stack must propagate cancellation",
1127            );
1128
1129        assert!(
1130            !inner_completed.load(Ordering::SeqCst),
1131            "Inner service future should NOT have completed"
1132        );
1133    }
1134}