Skip to main content

aws_smithy_http_client/
client.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6mod dns;
7/// Proxy configuration
8pub mod proxy;
9mod timeout;
10/// TLS connector(s)
11pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25    SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30    RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44    capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
56pub fn default_connector(
57    settings: &HttpConnectorSettings,
58    sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60    #[cfg(feature = "rustls-aws-lc")]
61    {
62        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63        let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65        if let Some(sleep) = sleep {
66            conn_builder = conn_builder.sleep_impl(sleep);
67        }
68
69        let conn = conn_builder
70            .tls_provider(tls::Provider::Rustls(
71                tls::rustls_provider::CryptoMode::AwsLc,
72            ))
73            .build();
74        Some(SharedHttpConnector::new(conn))
75    }
76    #[cfg(not(feature = "rustls-aws-lc"))]
77    {
78        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79        None
80    }
81}
82
83/// [`HttpConnector`] used to make HTTP requests.
84///
85/// This connector also implements socket connect and read timeouts.
86///
87/// This shouldn't be used directly in most cases.
88/// See the docs on [`Builder`] for examples of how to customize the HTTP client.
89#[derive(Debug)]
90pub struct Connector {
91    adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95    /// Builder for an HTTP connector.
96    pub fn builder() -> ConnectorBuilder {
97        ConnectorBuilder::default()
98    }
99}
100
101impl HttpConnector for Connector {
102    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
103        self.adapter.call(request)
104    }
105}
106
107/// Builder for [`Connector`].
108#[derive(Debug, Clone)]
109pub struct ConnectorBuilder<Tls = TlsUnset> {
110    connector_settings: Option<HttpConnectorSettings>,
111    sleep_impl: Option<SharedAsyncSleep>,
112    client_builder: Option<hyper_util::client::legacy::Builder>,
113    pool_idle_timeout: Option<Option<Duration>>,
114    enable_tcp_nodelay: bool,
115    interface: Option<String>,
116    proxy_config: Option<proxy::ProxyConfig>,
117    #[allow(unused)]
118    tls: Tls,
119}
120
121impl<Tls: Default> Default for ConnectorBuilder<Tls> {
122    fn default() -> Self {
123        Self {
124            connector_settings: None,
125            sleep_impl: None,
126            client_builder: None,
127            pool_idle_timeout: None,
128            // Curated default: TCP_NODELAY on. Without it, Nagle's algorithm
129            // can hold a small write while earlier data is unacknowledged. On
130            // request shapes emitted as multiple sub-MSS writes, this can add
131            // an ACK wait, often RTT plus delayed-ACK time. Opt out with
132            // `enable_tcp_nodelay(false)`.
133            enable_tcp_nodelay: true,
134            interface: None,
135            proxy_config: None,
136            tls: Tls::default(),
137        }
138    }
139}
140
141/// Initial builder state, `TlsProvider` choice required
142#[derive(Default, Debug, Clone)]
143#[non_exhaustive]
144pub struct TlsUnset {}
145
146/// TLS implementation selected
147#[derive(Debug, Clone)]
148pub struct TlsProviderSelected {
149    #[allow(unused)]
150    provider: tls::Provider,
151    #[allow(unused)]
152    context: TlsContext,
153}
154
155impl ConnectorBuilder<TlsUnset> {
156    /// Set the TLS implementation to use for this connector
157    pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
158        ConnectorBuilder {
159            connector_settings: self.connector_settings,
160            sleep_impl: self.sleep_impl,
161            client_builder: self.client_builder,
162            enable_tcp_nodelay: self.enable_tcp_nodelay,
163            interface: self.interface,
164            proxy_config: self.proxy_config,
165            pool_idle_timeout: self.pool_idle_timeout,
166            tls: TlsProviderSelected {
167                provider,
168                context: TlsContext::default(),
169            },
170        }
171    }
172
173    /// Build an HTTP connector sans TLS
174    #[doc(hidden)]
175    pub fn build_http(self) -> Connector {
176        if let Some(ref proxy_config) = self.proxy_config {
177            if proxy_config.requires_tls() {
178                tracing::warn!(
179                    "HTTPS proxy configured but no TLS provider set. \
180                     Connections to HTTPS proxy servers will fail. \
181                     Consider configuring a TLS provider to enable TLS support."
182                );
183            }
184        }
185
186        let base = self.base_connector();
187
188        // Wrap with HTTP proxy support if proxy is configured
189        let proxy_config = self
190            .proxy_config
191            .clone()
192            .unwrap_or_else(proxy::ProxyConfig::disabled);
193
194        if !proxy_config.is_disabled() {
195            let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
196            self.wrap_connector(http_proxy_connector)
197        } else {
198            self.wrap_connector(base)
199        }
200    }
201}
202
203impl<Any> ConnectorBuilder<Any> {
204    /// Create a [`Connector`] from this builder and a given connector.
205    pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
206    where
207        C: Send + Sync + 'static,
208        C: Clone,
209        C: tower::Service<Uri>,
210        C::Response: Read + Write + Connection + Send + Sync + Unpin,
211        C: Connect,
212        C::Future: Unpin + Send + 'static,
213        C::Error: Into<BoxError>,
214    {
215        let client_builder = self
216            .client_builder
217            .unwrap_or_else(|| new_tokio_hyper_builder(self.pool_idle_timeout));
218        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
219        let (connect_timeout, read_timeout) = self
220            .connector_settings
221            .map(|c| (c.connect_timeout(), c.read_timeout()))
222            .unwrap_or((None, None));
223
224        let connector = match connect_timeout {
225            Some(duration) => timeout::ConnectTimeout::new(
226                tcp_connector,
227                sleep_impl
228                    .clone()
229                    .expect("a sleep impl must be provided in order to have a connect timeout"),
230                duration,
231            ),
232            None => timeout::ConnectTimeout::no_timeout(tcp_connector),
233        };
234        let base = client_builder.build(connector);
235        let read_timeout = match read_timeout {
236            Some(duration) => timeout::HttpReadTimeout::new(
237                base,
238                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
239                duration,
240            ),
241            None => timeout::HttpReadTimeout::no_timeout(base),
242        };
243
244        let proxy_matcher = self
245            .proxy_config
246            .as_ref()
247            .map(|config| config.clone().into_hyper_util_matcher());
248
249        Connector {
250            adapter: Box::new(Adapter {
251                client: read_timeout,
252                proxy_matcher,
253            }),
254        }
255    }
256
257    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
258    /// (which is a base TCP connector with no TLS or any wrapping)
259    fn base_connector(&self) -> HyperHttpConnector {
260        self.base_connector_with_resolver(GaiResolver::new())
261    }
262
263    /// Get the base TCP connector by mapping our config to the underlying `HttpConnector` from hyper
264    /// using the given resolver `R`
265    fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
266        let mut conn = HyperHttpConnector::new_with_resolver(resolver);
267        conn.set_nodelay(self.enable_tcp_nodelay);
268        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
269        if let Some(interface) = &self.interface {
270            conn.set_interface(interface);
271        }
272        conn
273    }
274
275    /// Set the async sleep implementation used for timeouts
276    ///
277    /// Calling this is only necessary for testing or to use something other than
278    /// [`default_async_sleep`].
279    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
280        self.sleep_impl = Some(sleep_impl.into_shared());
281        self
282    }
283
284    /// Set the async sleep implementation used for timeouts
285    ///
286    /// Calling this is only necessary for testing or to use something other than
287    /// [`default_async_sleep`].
288    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
289        self.sleep_impl = sleep_impl;
290        self
291    }
292
293    /// Configure the HTTP settings for the `HyperAdapter`
294    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
295        self.connector_settings = Some(connector_settings);
296        self
297    }
298
299    /// Configure the HTTP settings for the `HyperAdapter`
300    pub fn set_connector_settings(
301        &mut self,
302        connector_settings: Option<HttpConnectorSettings>,
303    ) -> &mut Self {
304        self.connector_settings = connector_settings;
305        self
306    }
307
308    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
309    pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
310        self.enable_tcp_nodelay = nodelay;
311        self
312    }
313
314    /// Configure `SO_NODELAY` for all sockets to the supplied value `nodelay`
315    pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
316        self.enable_tcp_nodelay = nodelay;
317        self
318    }
319
320    /// Sets the value for the `SO_BINDTODEVICE` option on this socket.
321    ///
322    /// If a socket is bound to an interface, only packets received from that particular
323    /// interface are processed by the socket. Note that this only works for some socket
324    /// types (e.g. `AF_INET` sockets).
325    ///
326    /// On Linux it can be used to specify a [VRF], but the binary needs to either have
327    /// `CAP_NET_RAW` capability set or be run as root.
328    ///
329    /// This function is only available on Android, Fuchsia, and Linux.
330    ///
331    /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt
332    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
333    pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
334        self.interface = Some(interface.into());
335        self
336    }
337
338    /// Configure proxy settings for this connector
339    ///
340    /// This method allows you to set explicit proxy configuration for the HTTP client.
341    /// The proxy configuration will be used to determine whether requests should be
342    /// routed through a proxy server or connect directly.
343    ///
344    /// # Examples
345    ///
346    /// ```rust
347    /// # #[cfg(feature = "rustls-aws-lc")]
348    /// # {
349    /// use aws_smithy_http_client::{Connector, proxy::ProxyConfig, tls};
350    ///
351    /// let proxy_config = ProxyConfig::http("http://proxy.example.com:8080")?;
352    /// let connector = Connector::builder()
353    ///     .proxy_config(proxy_config)
354    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
355    ///     .build();
356    /// # }
357    /// # Ok::<(), Box<dyn std::error::Error>>(())
358    /// ```
359    pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
360        self.proxy_config = Some(config);
361        self
362    }
363
364    /// Configure proxy settings for this connector
365    ///
366    /// This is the mutable version of [`proxy_config`](Self::proxy_config).
367    pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
368        self.proxy_config = config;
369        self
370    }
371
372    /// Set an optional timeout for idle sockets being kept-alive.
373    ///
374    /// Pass `None` to disable timeout.
375    ///
376    /// Defaults to Hyper's default timeout, which is currently 90 seconds - see
377    /// [hyper_util::client::legacy::Builder::pool_idle_timeout],
378    /// but unlike that function, there is no need to call `pool_timer` yourself.
379    ///
380    /// # Examples
381    ///
382    /// ```rust
383    /// # #[cfg(feature = "rustls-aws-lc")]
384    /// # {
385    /// use aws_smithy_http_client::{Connector, tls};
386    /// use std::time::Duration;
387    ///
388    /// let connector = Connector::builder()
389    ///     .pool_idle_timeout(Duration::from_secs(30))
390    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
391    ///     .build();
392    /// # }
393    /// # Ok::<(), Box<dyn std::error::Error>>(())
394    /// ```
395    pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
396    where
397        D: Into<Option<Duration>>,
398    {
399        self.pool_idle_timeout = Some(val.into());
400        self
401    }
402
403    /// Set an optional timeout for idle sockets being kept-alive.
404    ///
405    /// Pass `None` to use Hyper's default timeout, `Some(None)` to disable timeouts.
406    ///
407    /// This is the mutable version of [`pool_idle_timeout`](Self::pool_idle_timeout).
408    ///
409    /// # Examples
410    ///
411    /// ```rust
412    /// # #[cfg(feature = "rustls-aws-lc")]
413    /// # {
414    /// use aws_smithy_http_client::{Connector, tls};
415    /// use std::time::Duration;
416    ///
417    /// let mut connector = Connector::builder();
418    /// connector
419    ///     .set_pool_idle_timeout(Some(Some(Duration::from_secs(30))));
420    /// connector
421    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
422    ///     .build();
423    /// # }
424    /// # Ok::<(), Box<dyn std::error::Error>>(())
425    /// ```
426    pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
427        self.pool_idle_timeout = val;
428        self
429    }
430
431    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
432    ///
433    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
434    pub(crate) fn hyper_builder(
435        mut self,
436        hyper_builder: hyper_util::client::legacy::Builder,
437    ) -> Self {
438        self.set_hyper_builder(Some(hyper_builder));
439        self
440    }
441
442    /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client.
443    ///
444    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
445    pub(crate) fn set_hyper_builder(
446        &mut self,
447        hyper_builder: Option<hyper_util::client::legacy::Builder>,
448    ) -> &mut Self {
449        self.client_builder = hyper_builder;
450        self
451    }
452}
453
454/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector`
455///
456/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`Connector::builder`].
457struct Adapter<C> {
458    client: timeout::HttpReadTimeout<
459        hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
460    >,
461    proxy_matcher: Option<Matcher>,
462}
463
464impl<C> fmt::Debug for Adapter<C> {
465    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466        f.debug_struct("Adapter")
467            .field("client", &"** hyper client **")
468            .field("proxy_matcher", &self.proxy_matcher.is_some())
469            .finish()
470    }
471}
472
473/// Extract a smithy connection from a hyper CaptureConnection
474fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
475    let capture_conn = capture_conn.clone();
476    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
477        let mut extensions = Extensions::new();
478        conn.get_extras(&mut extensions);
479        let http_info = extensions.get::<HttpInfo>();
480        let mut builder = ConnectionMetadata::builder()
481            .proxied(conn.is_proxied())
482            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
483                Some(conn) => conn.poison(),
484                None => tracing::trace!("no connection existed to poison"),
485            });
486
487        builder
488            .set_local_addr(http_info.map(|info| info.local_addr()))
489            .set_remote_addr(http_info.map(|info| info.remote_addr()));
490
491        let smithy_connection = builder.build();
492
493        Some(smithy_connection)
494    } else {
495        None
496    }
497}
498
499fn new_tokio_hyper_builder(
500    pool_idle_timeout: Option<Option<Duration>>,
501) -> hyper_util::client::legacy::Builder {
502    let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
503    // Explicitly setting the pool_timer is required for connection timeouts to work.
504    builder.pool_timer(TokioTimer::new());
505
506    if let Some(pool_idle_timeout) = pool_idle_timeout {
507        builder.pool_idle_timeout(pool_idle_timeout);
508    }
509
510    builder
511}
512
513impl<C> Adapter<C> {
514    /// Add proxy authentication header to the request if needed
515    fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
516        // Only add auth for HTTP requests (not HTTPS which uses CONNECT tunneling)
517        if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
518            return;
519        }
520
521        // Don't override existing proxy authorization header
522        if request
523            .headers()
524            .contains_key(http_1x::header::PROXY_AUTHORIZATION)
525        {
526            return;
527        }
528
529        if let Some(ref matcher) = self.proxy_matcher {
530            if let Some(intercept) = matcher.intercept(request.uri()) {
531                // Add basic auth header if available
532                if let Some(auth_header) = intercept.basic_auth() {
533                    request
534                        .headers_mut()
535                        .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
536                    tracing::debug!("added proxy authentication header for {}", request.uri());
537                }
538            }
539        }
540    }
541}
542
543impl<C> HttpConnector for Adapter<C>
544where
545    C: Clone + Send + Sync + 'static,
546    C: tower::Service<Uri>,
547    C::Response: Connection + Read + Write + Unpin + 'static,
548    timeout::ConnectTimeout<C>: Connect,
549    C::Future: Unpin + Send + 'static,
550    C::Error: Into<BoxError>,
551{
552    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
553        let mut request = match request.try_into_http1x() {
554            Ok(request) => request,
555            Err(err) => {
556                return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
557            }
558        };
559
560        self.add_proxy_auth_header(&mut request);
561
562        let capture_connection = capture_connection(&mut request);
563        if let Some(capture_smithy_connection) =
564            request.extensions().get::<CaptureSmithyConnection>()
565        {
566            capture_smithy_connection
567                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
568        }
569        let mut client = self.client.clone();
570        use tower::Service;
571        let fut = client.call(request);
572        HttpConnectorFuture::new(async move {
573            let response = fut
574                .await
575                .map_err(downcast_error)?
576                .map(SdkBody::from_body_1_x);
577            match HttpResponse::try_from(response) {
578                Ok(response) => Ok(response),
579                Err(err) => Err(ConnectorError::other(err.into(), None)),
580            }
581        })
582    }
583}
584
585/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
586fn downcast_error(err: BoxError) -> ConnectorError {
587    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
588    if find_source::<TimedOutError>(err.as_ref()).is_some() {
589        return ConnectorError::timeout(err);
590    }
591    // is the top of chain error actually already a `ConnectorError`? return that directly
592    let err = match err.downcast::<ConnectorError>() {
593        Ok(connector_error) => return *connector_error,
594        Err(box_error) => box_error,
595    };
596    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
597    // error classifications
598    let err = match find_source::<hyper::Error>(err.as_ref()) {
599        Some(hyper_error) => return to_connector_error(hyper_error)(err),
600        None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
601            Some(hyper_util_err) => {
602                if hyper_util_err.is_connect()
603                    || find_source::<std::io::Error>(hyper_util_err).is_some()
604                {
605                    return ConnectorError::io(err);
606                }
607                err
608            }
609            None => err,
610        },
611    };
612
613    // otherwise, we have no idea!
614    ConnectorError::other(err, None)
615}
616
617/// Convert a [`hyper::Error`] into a [`ConnectorError`]
618fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
619    if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
620        return ConnectorError::timeout;
621    }
622    if err.is_user() {
623        return ConnectorError::user;
624    }
625    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
626        return ConnectorError::io;
627    }
628    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
629    if err.is_incomplete_message() {
630        return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
631    }
632
633    if let Some(h2_err) = find_source::<h2::Error>(err) {
634        if h2_err.is_go_away()
635            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
636        {
637            return ConnectorError::io;
638        }
639    }
640
641    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
642    |err: BoxError| ConnectorError::other(err, None)
643}
644
645fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
646    let mut next = Some(err);
647    while let Some(err) = next {
648        if let Some(matching_err) = err.downcast_ref::<E>() {
649            return Some(matching_err);
650        }
651        next = err.source();
652    }
653    None
654}
655
656// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any
657// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl)
658// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well.
659#[derive(Clone, Debug, Eq, PartialEq, Hash)]
660struct CacheKey {
661    connect_timeout: Option<Duration>,
662    read_timeout: Option<Duration>,
663}
664
665impl From<&HttpConnectorSettings> for CacheKey {
666    fn from(value: &HttpConnectorSettings) -> Self {
667        Self {
668            connect_timeout: value.connect_timeout(),
669            read_timeout: value.read_timeout(),
670        }
671    }
672}
673
674struct HyperClient<F> {
675    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
676    client_builder: hyper_util::client::legacy::Builder,
677    connector_fn: F,
678}
679
680impl<F> fmt::Debug for HyperClient<F> {
681    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682        f.debug_struct("HyperClient")
683            .field("connector_cache", &self.connector_cache)
684            .field("client_builder", &self.client_builder)
685            .finish()
686    }
687}
688
689impl<F> HttpClient for HyperClient<F>
690where
691    F: Fn(
692            hyper_util::client::legacy::Builder,
693            Option<&HttpConnectorSettings>,
694            Option<&RuntimeComponents>,
695        ) -> Connector
696        + Send
697        + Sync
698        + 'static,
699{
700    fn http_connector(
701        &self,
702        settings: &HttpConnectorSettings,
703        components: &RuntimeComponents,
704    ) -> SharedHttpConnector {
705        let key = CacheKey::from(settings);
706        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
707        if connector.is_none() {
708            let mut cache = self.connector_cache.write().unwrap();
709            // Short-circuit if another thread already wrote a connector to the cache for this key
710            if !cache.contains_key(&key) {
711                let start = components.time_source().map(|ts| ts.now());
712                let connector = (self.connector_fn)(
713                    self.client_builder.clone(),
714                    Some(settings),
715                    Some(components),
716                );
717                let end = components.time_source().map(|ts| ts.now());
718                if let (Some(start), Some(end)) = (start, end) {
719                    if let Ok(elapsed) = end.duration_since(start) {
720                        tracing::debug!("new connector created in {:?}", elapsed);
721                    }
722                }
723                let connector = SharedHttpConnector::new(connector);
724                cache.insert(key.clone(), connector);
725            }
726            connector = cache.get(&key).cloned();
727        }
728
729        connector.expect("cache populated above")
730    }
731
732    fn validate_base_client_config(
733        &self,
734        _: &RuntimeComponentsBuilder,
735        _: &ConfigBag,
736    ) -> Result<(), BoxError> {
737        // Initialize the TCP connector at this point so that native certs load
738        // at client initialization time instead of upon first request. We do it
739        // here rather than at construction so that it won't run if this is not
740        // the selected HTTP client for the base config (for example, if this was
741        // the default HTTP client, and it was overridden by a later plugin).
742        let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
743        Ok(())
744    }
745
746    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
747        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
748    }
749}
750
751/// Builder for a hyper-backed [`HttpClient`] implementation.
752///
753/// This builder can be used to customize the underlying TCP connector used, as well as
754/// hyper client configuration.
755///
756/// # Examples
757///
758/// Construct a Hyper client with the RusTLS TLS implementation.
759/// This can be useful when you want to share a Hyper connector between multiple
760/// generated Smithy clients.
761#[derive(Clone, Default, Debug)]
762pub struct Builder<Tls = TlsUnset> {
763    client_builder: Option<hyper_util::client::legacy::Builder>,
764    pool_idle_timeout: Option<Option<Duration>>,
765    #[allow(unused)]
766    tls_provider: Tls,
767}
768
769cfg_tls! {
770    use aws_smithy_runtime_api::client::dns::ResolveDns;
771
772    impl ConnectorBuilder<TlsProviderSelected> {
773        /// Build a [`Connector`] that will use the default DNS resolver implementation.
774        pub fn build(self) -> Connector {
775            let http_connector = self.base_connector();
776            self.build_https(http_connector)
777        }
778
779        /// Configure the TLS context
780        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
781            self.tls.context = ctx;
782            self
783        }
784
785        /// Configure the TLS context
786        pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
787            self.tls.context = ctx;
788            self
789        }
790
791        /// Build a [`Connector`] that will use the given DNS resolver implementation.
792        pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
793            use crate::client::dns::HyperUtilResolver;
794            let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
795            self.build_https(http_connector)
796        }
797
798        fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
799        where
800            R: Clone + Send + Sync + 'static,
801            R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
802            R::Response: Iterator<Item = std::net::SocketAddr>,
803            R::Future: Send,
804            R::Error: Into<Box<dyn Error + Send + Sync>>,
805        {
806            match &self.tls.provider {
807                // TODO(hyper1) - fix cfg_rustls! to allow matching on patterns so we can re-use it and not duplicate these cfg matches everywhere
808                #[cfg(feature = "__rustls")]
809                tls::Provider::Rustls(crypto_mode) => {
810                    let proxy_config = self.proxy_config.clone()
811                        .unwrap_or_else(proxy::ProxyConfig::disabled);
812
813                    let https_connector = tls::rustls_provider::build_connector::wrap_connector(
814                        http_connector,
815                        crypto_mode.clone(),
816                        &self.tls.context,
817                        proxy_config,
818                    );
819                    self.wrap_connector(https_connector)
820                },
821                #[cfg(feature = "s2n-tls")]
822                tls::Provider::S2nTls  => {
823                    let proxy_config = self.proxy_config.clone()
824                        .unwrap_or_else(proxy::ProxyConfig::disabled);
825
826                    let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
827                        http_connector,
828                        &self.tls.context,
829                        proxy_config,
830                    );
831                    self.wrap_connector(https_connector)
832                }
833            }
834        }
835    }
836
837    impl Builder<TlsProviderSelected> {
838        /// Create an HTTPS client with the selected TLS provider.
839        ///
840        /// The trusted certificates will be loaded later when this becomes the selected
841        /// HTTP client for a Smithy client.
842        pub fn build_https(self) -> SharedHttpClient {
843            build_with_conn_fn(
844                self.client_builder,
845                self.pool_idle_timeout,
846                move |client_builder, settings, runtime_components| {
847                    let builder = new_conn_builder(client_builder, settings, runtime_components)
848                        .tls_provider(self.tls_provider.provider.clone())
849                        .tls_context(self.tls_provider.context.clone());
850                    builder.build()
851                },
852            )
853        }
854
855        /// Create an HTTPS client using a custom DNS resolver
856        pub fn build_with_resolver(
857            self,
858            resolver: impl ResolveDns + Clone + 'static,
859        ) -> SharedHttpClient {
860            build_with_conn_fn(
861                self.client_builder,
862                self.pool_idle_timeout,
863                move |client_builder, settings, runtime_components| {
864                    let builder = new_conn_builder(client_builder, settings, runtime_components)
865                        .tls_provider(self.tls_provider.provider.clone())
866                        .tls_context(self.tls_provider.context.clone());
867                    builder.build_with_resolver(resolver.clone())
868                },
869            )
870        }
871
872        /// Configure the TLS context
873        pub fn tls_context(mut self, ctx: TlsContext) -> Self {
874            self.tls_provider.context = ctx;
875            self
876        }
877    }
878}
879
880impl<Any> Builder<Any> {
881    /// Set an optional timeout for idle sockets being kept-alive.
882    ///
883    /// Pass `None` to disable timeout.
884    ///
885    /// Defaults to Hyper's default timeout, which is currently 90 seconds - see
886    /// [hyper_util::client::legacy::Builder::pool_idle_timeout],
887    /// but unlike that function, there is no need to call `pool_timer` yourself.
888    ///
889    /// # Examples
890    ///
891    /// ```rust
892    /// # #[cfg(feature = "rustls-aws-lc")]
893    /// # {
894    /// use aws_smithy_http_client::{Builder, tls};
895    /// use std::time::Duration;
896    ///
897    /// let client = Builder::new()
898    ///     .pool_idle_timeout(Duration::from_secs(30))
899    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
900    ///     .build_https();
901    /// # }
902    /// # Ok::<(), Box<dyn std::error::Error>>(())
903    /// ```
904    pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
905    where
906        D: Into<Option<Duration>>,
907    {
908        self.pool_idle_timeout = Some(val.into());
909        self
910    }
911
912    /// Set an optional timeout for idle sockets being kept-alive.
913    ///
914    /// Pass `None` to use Hyper's default timeout, `Some(None)` to disable timeouts.
915    ///
916    /// This is the mutable version of [`pool_idle_timeout`](Self::pool_idle_timeout).
917    ///
918    /// # Examples
919    ///
920    /// ```rust
921    /// # #[cfg(feature = "rustls-aws-lc")]
922    /// # {
923    /// use std::time::Duration;
924    /// use aws_smithy_http_client::{Builder, tls};
925    ///
926    /// let mut client = Builder::new();
927    /// client.set_pool_idle_timeout(Some(Some(Duration::from_secs(30))));
928    /// client
929    ///     .tls_provider(tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc))
930    ///     .build_https();
931    /// # }
932    /// # Ok::<(), Box<dyn std::error::Error>>(())
933    /// ```
934    pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
935        self.pool_idle_timeout = val;
936        self
937    }
938}
939
940impl Builder<TlsUnset> {
941    /// Creates a new builder.
942    pub fn new() -> Self {
943        Self::default()
944    }
945
946    /// Returns a [`SharedHttpClient`] that calls the given `connector` function to select an HTTP(S) connector.
947    #[doc(hidden)]
948    pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
949    where
950        F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
951            + Send
952            + Sync
953            + 'static,
954    {
955        build_with_conn_fn(
956            self.client_builder,
957            self.pool_idle_timeout,
958            move |_builder, settings, runtime_components| {
959                connector_fn(settings, runtime_components)
960            },
961        )
962    }
963
964    /// Build a new HTTP client without TLS enabled
965    #[doc(hidden)]
966    pub fn build_http(self) -> SharedHttpClient {
967        build_with_conn_fn(
968            self.client_builder,
969            self.pool_idle_timeout,
970            move |client_builder, settings, runtime_components| {
971                let builder = new_conn_builder(client_builder, settings, runtime_components);
972                builder.build_http()
973            },
974        )
975    }
976
977    /// Set the TLS implementation to use
978    pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
979        Builder {
980            client_builder: self.client_builder,
981            pool_idle_timeout: self.pool_idle_timeout,
982            tls_provider: TlsProviderSelected {
983                provider,
984                context: TlsContext::default(),
985            },
986        }
987    }
988}
989
990pub(crate) fn build_with_conn_fn<F>(
991    client_builder: Option<hyper_util::client::legacy::Builder>,
992    pool_idle_timeout: Option<Option<Duration>>,
993    connector_fn: F,
994) -> SharedHttpClient
995where
996    F: Fn(
997            hyper_util::client::legacy::Builder,
998            Option<&HttpConnectorSettings>,
999            Option<&RuntimeComponents>,
1000        ) -> Connector
1001        + Send
1002        + Sync
1003        + 'static,
1004{
1005    let client_builder =
1006        client_builder.unwrap_or_else(|| new_tokio_hyper_builder(pool_idle_timeout));
1007    SharedHttpClient::new(HyperClient {
1008        connector_cache: RwLock::new(HashMap::new()),
1009        client_builder,
1010        connector_fn,
1011    })
1012}
1013
1014#[allow(dead_code)]
1015pub(crate) fn build_with_tcp_conn_fn<C, F>(
1016    client_builder: Option<hyper_util::client::legacy::Builder>,
1017    pool_idle_timeout: Option<Option<Duration>>,
1018    tcp_connector_fn: F,
1019) -> SharedHttpClient
1020where
1021    F: Fn() -> C + Send + Sync + 'static,
1022    C: Clone + Send + Sync + 'static,
1023    C: tower::Service<Uri>,
1024    C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
1025    C::Future: Unpin + Send + 'static,
1026    C::Error: Into<BoxError>,
1027    C: Connect,
1028{
1029    build_with_conn_fn(
1030        client_builder,
1031        pool_idle_timeout,
1032        move |client_builder, settings, runtime_components| {
1033            let builder = new_conn_builder(client_builder, settings, runtime_components);
1034            builder.wrap_connector(tcp_connector_fn())
1035        },
1036    )
1037}
1038
1039fn new_conn_builder(
1040    client_builder: hyper_util::client::legacy::Builder,
1041    settings: Option<&HttpConnectorSettings>,
1042    runtime_components: Option<&RuntimeComponents>,
1043) -> ConnectorBuilder {
1044    let mut builder = Connector::builder().hyper_builder(client_builder);
1045    builder.set_connector_settings(settings.cloned());
1046    if let Some(components) = runtime_components {
1047        builder.set_sleep_impl(components.sleep_impl());
1048    }
1049    builder
1050}
1051
1052#[cfg(test)]
1053mod test {
1054    use std::io::{Error, ErrorKind};
1055    use std::pin::Pin;
1056    use std::sync::atomic::{AtomicU32, Ordering};
1057    use std::sync::Arc;
1058    use std::task::{Context, Poll};
1059
1060    use crate::client::timeout::test::NeverConnects;
1061    use aws_smithy_async::assert_elapsed;
1062    use aws_smithy_async::rt::sleep::TokioSleep;
1063    use aws_smithy_async::time::SystemTimeSource;
1064    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1065    use http_1x::Uri;
1066    use hyper::rt::ReadBufCursor;
1067    use hyper_util::client::legacy::connect::Connected;
1068
1069    use super::*;
1070
1071    #[tokio::test]
1072    async fn connector_selection() {
1073        // Create a client that increments a count every time it creates a new Connector
1074        let creation_count = Arc::new(AtomicU32::new(0));
1075        let http_client = build_with_tcp_conn_fn(None, None, {
1076            let count = creation_count.clone();
1077            move || {
1078                count.fetch_add(1, Ordering::Relaxed);
1079                NeverConnects
1080            }
1081        });
1082
1083        // This configuration should result in 4 separate connectors with different timeout settings
1084        let settings = [
1085            HttpConnectorSettings::builder()
1086                .connect_timeout(Duration::from_secs(3))
1087                .build(),
1088            HttpConnectorSettings::builder()
1089                .read_timeout(Duration::from_secs(3))
1090                .build(),
1091            HttpConnectorSettings::builder()
1092                .connect_timeout(Duration::from_secs(3))
1093                .read_timeout(Duration::from_secs(3))
1094                .build(),
1095            HttpConnectorSettings::builder()
1096                .connect_timeout(Duration::from_secs(5))
1097                .read_timeout(Duration::from_secs(3))
1098                .build(),
1099        ];
1100
1101        // Kick off thousands of parallel tasks that will try to create a connector
1102        let components = RuntimeComponentsBuilder::for_tests()
1103            .with_time_source(Some(SystemTimeSource::new()))
1104            .build()
1105            .unwrap();
1106        let mut handles = Vec::new();
1107        for setting in &settings {
1108            for _ in 0..1000 {
1109                let client = http_client.clone();
1110                handles.push(tokio::spawn({
1111                    let setting = setting.clone();
1112                    let components = components.clone();
1113                    async move {
1114                        let _ = client.http_connector(&setting, &components);
1115                    }
1116                }));
1117            }
1118        }
1119        for handle in handles {
1120            handle.await.unwrap();
1121        }
1122
1123        // Verify only 4 connectors were created amidst the chaos
1124        assert_eq!(4, creation_count.load(Ordering::Relaxed));
1125    }
1126
1127    #[tokio::test]
1128    async fn hyper_io_error() {
1129        let connector = TestConnection {
1130            inner: HangupStream,
1131        };
1132        let adapter = Connector::builder().wrap_connector(connector).adapter;
1133        let err = adapter
1134            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1135            .await
1136            .expect_err("socket hangup");
1137        assert!(err.is_io(), "unexpected error type: {:?}", err);
1138    }
1139
1140    // ---- machinery to make a Hyper connector that responds with an IO Error
1141    #[derive(Clone)]
1142    struct HangupStream;
1143
1144    impl Connection for HangupStream {
1145        fn connected(&self) -> Connected {
1146            Connected::new()
1147        }
1148    }
1149
1150    impl Read for HangupStream {
1151        fn poll_read(
1152            self: Pin<&mut Self>,
1153            _cx: &mut Context<'_>,
1154            _buf: ReadBufCursor<'_>,
1155        ) -> Poll<std::io::Result<()>> {
1156            Poll::Ready(Err(Error::new(
1157                ErrorKind::ConnectionReset,
1158                "connection reset",
1159            )))
1160        }
1161    }
1162
1163    impl Write for HangupStream {
1164        fn poll_write(
1165            self: Pin<&mut Self>,
1166            _cx: &mut Context<'_>,
1167            _buf: &[u8],
1168        ) -> Poll<Result<usize, Error>> {
1169            Poll::Pending
1170        }
1171
1172        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1173            Poll::Pending
1174        }
1175
1176        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1177            Poll::Pending
1178        }
1179    }
1180
1181    #[derive(Clone)]
1182    struct TestConnection<T> {
1183        inner: T,
1184    }
1185
1186    impl<T> tower::Service<Uri> for TestConnection<T>
1187    where
1188        T: Clone + Connection,
1189    {
1190        type Response = T;
1191        type Error = BoxError;
1192        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1193
1194        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1195            Poll::Ready(Ok(()))
1196        }
1197
1198        fn call(&mut self, _req: Uri) -> Self::Future {
1199            std::future::ready(Ok(self.inner.clone()))
1200        }
1201    }
1202
1203    #[tokio::test]
1204    async fn http_connect_timeout_works() {
1205        let tcp_connector = NeverConnects::default();
1206        let connector_settings = HttpConnectorSettings::builder()
1207            .connect_timeout(Duration::from_secs(1))
1208            .build();
1209        let hyper = Connector::builder()
1210            .connector_settings(connector_settings)
1211            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1212            .wrap_connector(tcp_connector)
1213            .adapter;
1214        let now = tokio::time::Instant::now();
1215        tokio::time::pause();
1216        let resp = hyper
1217            .call(HttpRequest::get("https://static-uri.com").unwrap())
1218            .await
1219            .unwrap_err();
1220        assert!(
1221            resp.is_timeout(),
1222            "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1223            resp
1224        );
1225        let message = DisplayErrorContext(&resp).to_string();
1226        let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1227        assert!(
1228            message.contains(expected),
1229            "expected '{message}' to contain '{expected}'"
1230        );
1231        assert_elapsed!(now, Duration::from_secs(1));
1232    }
1233
1234    #[tokio::test]
1235    async fn http_read_timeout_works() {
1236        let tcp_connector = crate::client::timeout::test::NeverReplies;
1237        let connector_settings = HttpConnectorSettings::builder()
1238            .connect_timeout(Duration::from_secs(1))
1239            .read_timeout(Duration::from_secs(2))
1240            .build();
1241        let hyper = Connector::builder()
1242            .connector_settings(connector_settings)
1243            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1244            .wrap_connector(tcp_connector)
1245            .adapter;
1246        let now = tokio::time::Instant::now();
1247        tokio::time::pause();
1248        let err = hyper
1249            .call(HttpRequest::get("https://fake-uri.com").unwrap())
1250            .await
1251            .unwrap_err();
1252        assert!(
1253            err.is_timeout(),
1254            "expected err.is_timeout() to be true but it was false, err == {err:?}",
1255        );
1256        let message = format!("{}", DisplayErrorContext(&err));
1257        let expected = "timeout: HTTP read timeout occurred after 2s";
1258        assert!(
1259            message.contains(expected),
1260            "expected '{message}' to contain '{expected}'"
1261        );
1262        assert_elapsed!(now, Duration::from_secs(2));
1263    }
1264
1265    #[cfg(not(windows))]
1266    #[tokio::test]
1267    async fn connection_refused_works() {
1268        use crate::client::dns::HyperUtilResolver;
1269        use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1270        use std::net::{IpAddr, Ipv4Addr};
1271
1272        #[derive(Debug, Clone, Default)]
1273        struct TestResolver;
1274        impl ResolveDns for TestResolver {
1275            fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1276                let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1277                DnsFuture::ready(Ok(vec![localhost_v4]))
1278            }
1279        }
1280
1281        let connector_settings = HttpConnectorSettings::builder()
1282            .connect_timeout(Duration::from_secs(20))
1283            .build();
1284
1285        let resolver = HyperUtilResolver {
1286            resolver: TestResolver,
1287        };
1288        let connector = Connector::builder().base_connector_with_resolver(resolver);
1289
1290        let hyper = Connector::builder()
1291            .connector_settings(connector_settings)
1292            .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1293            .wrap_connector(connector)
1294            .adapter;
1295
1296        let resp = hyper
1297            .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1298            .await
1299            .unwrap_err();
1300        assert!(
1301            resp.is_io(),
1302            "expected resp.is_io() to be true but it was false, resp == {:?}",
1303            resp
1304        );
1305        let message = DisplayErrorContext(&resp).to_string();
1306        let expected = "Connection refused";
1307        assert!(
1308            message.contains(expected),
1309            "expected '{message}' to contain '{expected}'"
1310        );
1311    }
1312
1313    #[cfg(feature = "s2n-tls")]
1314    #[tokio::test]
1315    async fn s2n_tls_provider() {
1316        // Create an HttpConnector with the s2n-tls provider.
1317        let client = Builder::new()
1318            .tls_provider(tls::Provider::S2nTls)
1319            .build_https();
1320        let connector_settings = HttpConnectorSettings::builder().build();
1321
1322        // HyperClient::http_connector invokes TimeSource::now to determine how long it takes to
1323        // create new HttpConnectors. As such, a real time source must be provided.
1324        let runtime_components = RuntimeComponentsBuilder::for_tests()
1325            .with_time_source(Some(SystemTimeSource::new()))
1326            .build()
1327            .unwrap();
1328
1329        let connector = client.http_connector(&connector_settings, &runtime_components);
1330
1331        // Ensure that s2n-tls is used as the underlying TLS provider when selected.
1332        //
1333        // s2n-tls-hyper will error when given an invalid scheme. Ensure that this error is produced
1334        // from s2n-tls-hyper, and not another TLS provider.
1335        let error = connector
1336            .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1337            .await
1338            .unwrap_err();
1339        let error = error.into_source();
1340        let s2n_error = error
1341            .source()
1342            .unwrap()
1343            .downcast_ref::<s2n_tls_hyper::error::Error>()
1344            .unwrap();
1345        assert!(matches!(
1346            s2n_error,
1347            s2n_tls_hyper::error::Error::InvalidScheme
1348        ));
1349    }
1350}