1mod dns;
7pub mod proxy;
9mod timeout;
10pub mod tls;
12
13pub(crate) mod connect;
14
15use crate::cfg::cfg_tls;
16use crate::tls::TlsContext;
17use aws_smithy_async::future::timeout::TimedOutError;
18use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
21use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
22use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
23use aws_smithy_runtime_api::client::http::{
24 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
25 SharedHttpConnector,
26};
27use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
28use aws_smithy_runtime_api::client::result::ConnectorError;
29use aws_smithy_runtime_api::client::runtime_components::{
30 RuntimeComponents, RuntimeComponentsBuilder,
31};
32use aws_smithy_runtime_api::shared::IntoShared;
33use aws_smithy_types::body::SdkBody;
34use aws_smithy_types::config_bag::ConfigBag;
35use aws_smithy_types::error::display::DisplayErrorContext;
36use aws_smithy_types::retry::ErrorKind;
37use client::connect::Connection;
38use h2::Reason;
39use http_1x::{Extensions, Uri};
40use hyper::rt::{Read, Write};
41use hyper_util::client::legacy as client;
42use hyper_util::client::legacy::connect::dns::GaiResolver;
43use hyper_util::client::legacy::connect::{
44 capture_connection, CaptureConnection, Connect, HttpConnector as HyperHttpConnector, HttpInfo,
45};
46use hyper_util::client::proxy::matcher::Matcher;
47use hyper_util::rt::{TokioExecutor, TokioTimer};
48use std::borrow::Cow;
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt;
52use std::sync::RwLock;
53use std::time::Duration;
54
55pub fn default_connector(
57 settings: &HttpConnectorSettings,
58 sleep: Option<SharedAsyncSleep>,
59) -> Option<SharedHttpConnector> {
60 #[cfg(feature = "rustls-aws-lc")]
61 {
62 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
63 let mut conn_builder = Connector::builder().connector_settings(settings.clone());
64
65 if let Some(sleep) = sleep {
66 conn_builder = conn_builder.sleep_impl(sleep);
67 }
68
69 let conn = conn_builder
70 .tls_provider(tls::Provider::Rustls(
71 tls::rustls_provider::CryptoMode::AwsLc,
72 ))
73 .build();
74 Some(SharedHttpConnector::new(conn))
75 }
76 #[cfg(not(feature = "rustls-aws-lc"))]
77 {
78 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
79 None
80 }
81}
82
83#[derive(Debug)]
90pub struct Connector {
91 adapter: Box<dyn HttpConnector>,
92}
93
94impl Connector {
95 pub fn builder() -> ConnectorBuilder {
97 ConnectorBuilder::default()
98 }
99}
100
101impl HttpConnector for Connector {
102 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
103 self.adapter.call(request)
104 }
105}
106
107#[derive(Debug, Clone)]
109pub struct ConnectorBuilder<Tls = TlsUnset> {
110 connector_settings: Option<HttpConnectorSettings>,
111 sleep_impl: Option<SharedAsyncSleep>,
112 client_builder: Option<hyper_util::client::legacy::Builder>,
113 pool_idle_timeout: Option<Option<Duration>>,
114 enable_tcp_nodelay: bool,
115 interface: Option<String>,
116 proxy_config: Option<proxy::ProxyConfig>,
117 #[allow(unused)]
118 tls: Tls,
119}
120
121impl<Tls: Default> Default for ConnectorBuilder<Tls> {
122 fn default() -> Self {
123 Self {
124 connector_settings: None,
125 sleep_impl: None,
126 client_builder: None,
127 pool_idle_timeout: None,
128 enable_tcp_nodelay: true,
134 interface: None,
135 proxy_config: None,
136 tls: Tls::default(),
137 }
138 }
139}
140
141#[derive(Default, Debug, Clone)]
143#[non_exhaustive]
144pub struct TlsUnset {}
145
146#[derive(Debug, Clone)]
148pub struct TlsProviderSelected {
149 #[allow(unused)]
150 provider: tls::Provider,
151 #[allow(unused)]
152 context: TlsContext,
153}
154
155impl ConnectorBuilder<TlsUnset> {
156 pub fn tls_provider(self, provider: tls::Provider) -> ConnectorBuilder<TlsProviderSelected> {
158 ConnectorBuilder {
159 connector_settings: self.connector_settings,
160 sleep_impl: self.sleep_impl,
161 client_builder: self.client_builder,
162 enable_tcp_nodelay: self.enable_tcp_nodelay,
163 interface: self.interface,
164 proxy_config: self.proxy_config,
165 pool_idle_timeout: self.pool_idle_timeout,
166 tls: TlsProviderSelected {
167 provider,
168 context: TlsContext::default(),
169 },
170 }
171 }
172
173 #[doc(hidden)]
175 pub fn build_http(self) -> Connector {
176 if let Some(ref proxy_config) = self.proxy_config {
177 if proxy_config.requires_tls() {
178 tracing::warn!(
179 "HTTPS proxy configured but no TLS provider set. \
180 Connections to HTTPS proxy servers will fail. \
181 Consider configuring a TLS provider to enable TLS support."
182 );
183 }
184 }
185
186 let base = self.base_connector();
187
188 let proxy_config = self
190 .proxy_config
191 .clone()
192 .unwrap_or_else(proxy::ProxyConfig::disabled);
193
194 if !proxy_config.is_disabled() {
195 let http_proxy_connector = connect::HttpProxyConnector::new(base, proxy_config);
196 self.wrap_connector(http_proxy_connector)
197 } else {
198 self.wrap_connector(base)
199 }
200 }
201}
202
203impl<Any> ConnectorBuilder<Any> {
204 pub(crate) fn wrap_connector<C>(self, tcp_connector: C) -> Connector
206 where
207 C: Send + Sync + 'static,
208 C: Clone,
209 C: tower::Service<Uri>,
210 C::Response: Read + Write + Connection + Send + Sync + Unpin,
211 C: Connect,
212 C::Future: Unpin + Send + 'static,
213 C::Error: Into<BoxError>,
214 {
215 let client_builder = self
216 .client_builder
217 .unwrap_or_else(|| new_tokio_hyper_builder(self.pool_idle_timeout));
218 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
219 let (connect_timeout, read_timeout) = self
220 .connector_settings
221 .map(|c| (c.connect_timeout(), c.read_timeout()))
222 .unwrap_or((None, None));
223
224 let connector = match connect_timeout {
225 Some(duration) => timeout::ConnectTimeout::new(
226 tcp_connector,
227 sleep_impl
228 .clone()
229 .expect("a sleep impl must be provided in order to have a connect timeout"),
230 duration,
231 ),
232 None => timeout::ConnectTimeout::no_timeout(tcp_connector),
233 };
234 let base = client_builder.build(connector);
235 let read_timeout = match read_timeout {
236 Some(duration) => timeout::HttpReadTimeout::new(
237 base,
238 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
239 duration,
240 ),
241 None => timeout::HttpReadTimeout::no_timeout(base),
242 };
243
244 let proxy_matcher = self
245 .proxy_config
246 .as_ref()
247 .map(|config| config.clone().into_hyper_util_matcher());
248
249 Connector {
250 adapter: Box::new(Adapter {
251 client: read_timeout,
252 proxy_matcher,
253 }),
254 }
255 }
256
257 fn base_connector(&self) -> HyperHttpConnector {
260 self.base_connector_with_resolver(GaiResolver::new())
261 }
262
263 fn base_connector_with_resolver<R>(&self, resolver: R) -> HyperHttpConnector<R> {
266 let mut conn = HyperHttpConnector::new_with_resolver(resolver);
267 conn.set_nodelay(self.enable_tcp_nodelay);
268 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
269 if let Some(interface) = &self.interface {
270 conn.set_interface(interface);
271 }
272 conn
273 }
274
275 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
280 self.sleep_impl = Some(sleep_impl.into_shared());
281 self
282 }
283
284 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
289 self.sleep_impl = sleep_impl;
290 self
291 }
292
293 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
295 self.connector_settings = Some(connector_settings);
296 self
297 }
298
299 pub fn set_connector_settings(
301 &mut self,
302 connector_settings: Option<HttpConnectorSettings>,
303 ) -> &mut Self {
304 self.connector_settings = connector_settings;
305 self
306 }
307
308 pub fn enable_tcp_nodelay(mut self, nodelay: bool) -> Self {
310 self.enable_tcp_nodelay = nodelay;
311 self
312 }
313
314 pub fn set_enable_tcp_nodelay(&mut self, nodelay: bool) -> &mut Self {
316 self.enable_tcp_nodelay = nodelay;
317 self
318 }
319
320 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
333 pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
334 self.interface = Some(interface.into());
335 self
336 }
337
338 pub fn proxy_config(mut self, config: proxy::ProxyConfig) -> Self {
360 self.proxy_config = Some(config);
361 self
362 }
363
364 pub fn set_proxy_config(&mut self, config: Option<proxy::ProxyConfig>) -> &mut Self {
368 self.proxy_config = config;
369 self
370 }
371
372 pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
396 where
397 D: Into<Option<Duration>>,
398 {
399 self.pool_idle_timeout = Some(val.into());
400 self
401 }
402
403 pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
427 self.pool_idle_timeout = val;
428 self
429 }
430
431 pub(crate) fn hyper_builder(
435 mut self,
436 hyper_builder: hyper_util::client::legacy::Builder,
437 ) -> Self {
438 self.set_hyper_builder(Some(hyper_builder));
439 self
440 }
441
442 pub(crate) fn set_hyper_builder(
446 &mut self,
447 hyper_builder: Option<hyper_util::client::legacy::Builder>,
448 ) -> &mut Self {
449 self.client_builder = hyper_builder;
450 self
451 }
452}
453
454struct Adapter<C> {
458 client: timeout::HttpReadTimeout<
459 hyper_util::client::legacy::Client<timeout::ConnectTimeout<C>, SdkBody>,
460 >,
461 proxy_matcher: Option<Matcher>,
462}
463
464impl<C> fmt::Debug for Adapter<C> {
465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466 f.debug_struct("Adapter")
467 .field("client", &"** hyper client **")
468 .field("proxy_matcher", &self.proxy_matcher.is_some())
469 .finish()
470 }
471}
472
473fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
475 let capture_conn = capture_conn.clone();
476 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
477 let mut extensions = Extensions::new();
478 conn.get_extras(&mut extensions);
479 let http_info = extensions.get::<HttpInfo>();
480 let mut builder = ConnectionMetadata::builder()
481 .proxied(conn.is_proxied())
482 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
483 Some(conn) => conn.poison(),
484 None => tracing::trace!("no connection existed to poison"),
485 });
486
487 builder
488 .set_local_addr(http_info.map(|info| info.local_addr()))
489 .set_remote_addr(http_info.map(|info| info.remote_addr()));
490
491 let smithy_connection = builder.build();
492
493 Some(smithy_connection)
494 } else {
495 None
496 }
497}
498
499fn new_tokio_hyper_builder(
500 pool_idle_timeout: Option<Option<Duration>>,
501) -> hyper_util::client::legacy::Builder {
502 let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
503 builder.pool_timer(TokioTimer::new());
505
506 if let Some(pool_idle_timeout) = pool_idle_timeout {
507 builder.pool_idle_timeout(pool_idle_timeout);
508 }
509
510 builder
511}
512
513impl<C> Adapter<C> {
514 fn add_proxy_auth_header(&self, request: &mut http_1x::Request<SdkBody>) {
516 if request.uri().scheme() != Some(&http_1x::uri::Scheme::HTTP) {
518 return;
519 }
520
521 if request
523 .headers()
524 .contains_key(http_1x::header::PROXY_AUTHORIZATION)
525 {
526 return;
527 }
528
529 if let Some(ref matcher) = self.proxy_matcher {
530 if let Some(intercept) = matcher.intercept(request.uri()) {
531 if let Some(auth_header) = intercept.basic_auth() {
533 request
534 .headers_mut()
535 .insert(http_1x::header::PROXY_AUTHORIZATION, auth_header.clone());
536 tracing::debug!("added proxy authentication header for {}", request.uri());
537 }
538 }
539 }
540 }
541}
542
543impl<C> HttpConnector for Adapter<C>
544where
545 C: Clone + Send + Sync + 'static,
546 C: tower::Service<Uri>,
547 C::Response: Connection + Read + Write + Unpin + 'static,
548 timeout::ConnectTimeout<C>: Connect,
549 C::Future: Unpin + Send + 'static,
550 C::Error: Into<BoxError>,
551{
552 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
553 let mut request = match request.try_into_http1x() {
554 Ok(request) => request,
555 Err(err) => {
556 return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into())));
557 }
558 };
559
560 self.add_proxy_auth_header(&mut request);
561
562 let capture_connection = capture_connection(&mut request);
563 if let Some(capture_smithy_connection) =
564 request.extensions().get::<CaptureSmithyConnection>()
565 {
566 capture_smithy_connection
567 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
568 }
569 let mut client = self.client.clone();
570 use tower::Service;
571 let fut = client.call(request);
572 HttpConnectorFuture::new(async move {
573 let response = fut
574 .await
575 .map_err(downcast_error)?
576 .map(SdkBody::from_body_1_x);
577 match HttpResponse::try_from(response) {
578 Ok(response) => Ok(response),
579 Err(err) => Err(ConnectorError::other(err.into(), None)),
580 }
581 })
582 }
583}
584
585fn downcast_error(err: BoxError) -> ConnectorError {
587 if find_source::<TimedOutError>(err.as_ref()).is_some() {
589 return ConnectorError::timeout(err);
590 }
591 let err = match err.downcast::<ConnectorError>() {
593 Ok(connector_error) => return *connector_error,
594 Err(box_error) => box_error,
595 };
596 let err = match find_source::<hyper::Error>(err.as_ref()) {
599 Some(hyper_error) => return to_connector_error(hyper_error)(err),
600 None => match find_source::<hyper_util::client::legacy::Error>(err.as_ref()) {
601 Some(hyper_util_err) => {
602 if hyper_util_err.is_connect()
603 || find_source::<std::io::Error>(hyper_util_err).is_some()
604 {
605 return ConnectorError::io(err);
606 }
607 err
608 }
609 None => err,
610 },
611 };
612
613 ConnectorError::other(err, None)
615}
616
617fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError {
619 if err.is_timeout() || find_source::<timeout::HttpTimeoutError>(err).is_some() {
620 return ConnectorError::timeout;
621 }
622 if err.is_user() {
623 return ConnectorError::user;
624 }
625 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(err).is_some() {
626 return ConnectorError::io;
627 }
628 if err.is_incomplete_message() {
630 return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError));
631 }
632
633 if let Some(h2_err) = find_source::<h2::Error>(err) {
634 if h2_err.is_go_away()
635 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
636 {
637 return ConnectorError::io;
638 }
639 }
640
641 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
642 |err: BoxError| ConnectorError::other(err, None)
643}
644
645fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
646 let mut next = Some(err);
647 while let Some(err) = next {
648 if let Some(matching_err) = err.downcast_ref::<E>() {
649 return Some(matching_err);
650 }
651 next = err.source();
652 }
653 None
654}
655
656#[derive(Clone, Debug, Eq, PartialEq, Hash)]
660struct CacheKey {
661 connect_timeout: Option<Duration>,
662 read_timeout: Option<Duration>,
663}
664
665impl From<&HttpConnectorSettings> for CacheKey {
666 fn from(value: &HttpConnectorSettings) -> Self {
667 Self {
668 connect_timeout: value.connect_timeout(),
669 read_timeout: value.read_timeout(),
670 }
671 }
672}
673
674struct HyperClient<F> {
675 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
676 client_builder: hyper_util::client::legacy::Builder,
677 connector_fn: F,
678}
679
680impl<F> fmt::Debug for HyperClient<F> {
681 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682 f.debug_struct("HyperClient")
683 .field("connector_cache", &self.connector_cache)
684 .field("client_builder", &self.client_builder)
685 .finish()
686 }
687}
688
689impl<F> HttpClient for HyperClient<F>
690where
691 F: Fn(
692 hyper_util::client::legacy::Builder,
693 Option<&HttpConnectorSettings>,
694 Option<&RuntimeComponents>,
695 ) -> Connector
696 + Send
697 + Sync
698 + 'static,
699{
700 fn http_connector(
701 &self,
702 settings: &HttpConnectorSettings,
703 components: &RuntimeComponents,
704 ) -> SharedHttpConnector {
705 let key = CacheKey::from(settings);
706 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
707 if connector.is_none() {
708 let mut cache = self.connector_cache.write().unwrap();
709 if !cache.contains_key(&key) {
711 let start = components.time_source().map(|ts| ts.now());
712 let connector = (self.connector_fn)(
713 self.client_builder.clone(),
714 Some(settings),
715 Some(components),
716 );
717 let end = components.time_source().map(|ts| ts.now());
718 if let (Some(start), Some(end)) = (start, end) {
719 if let Ok(elapsed) = end.duration_since(start) {
720 tracing::debug!("new connector created in {:?}", elapsed);
721 }
722 }
723 let connector = SharedHttpConnector::new(connector);
724 cache.insert(key.clone(), connector);
725 }
726 connector = cache.get(&key).cloned();
727 }
728
729 connector.expect("cache populated above")
730 }
731
732 fn validate_base_client_config(
733 &self,
734 _: &RuntimeComponentsBuilder,
735 _: &ConfigBag,
736 ) -> Result<(), BoxError> {
737 let _ = (self.connector_fn)(self.client_builder.clone(), None, None);
743 Ok(())
744 }
745
746 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
747 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x"))))
748 }
749}
750
751#[derive(Clone, Default, Debug)]
762pub struct Builder<Tls = TlsUnset> {
763 client_builder: Option<hyper_util::client::legacy::Builder>,
764 pool_idle_timeout: Option<Option<Duration>>,
765 #[allow(unused)]
766 tls_provider: Tls,
767}
768
769cfg_tls! {
770 use aws_smithy_runtime_api::client::dns::ResolveDns;
771
772 impl ConnectorBuilder<TlsProviderSelected> {
773 pub fn build(self) -> Connector {
775 let http_connector = self.base_connector();
776 self.build_https(http_connector)
777 }
778
779 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
781 self.tls.context = ctx;
782 self
783 }
784
785 pub fn set_tls_context(&mut self, ctx: TlsContext) -> &mut Self {
787 self.tls.context = ctx;
788 self
789 }
790
791 pub fn build_with_resolver<R: ResolveDns + Clone + 'static>(self, resolver: R) -> Connector {
793 use crate::client::dns::HyperUtilResolver;
794 let http_connector = self.base_connector_with_resolver(HyperUtilResolver { resolver });
795 self.build_https(http_connector)
796 }
797
798 fn build_https<R>(self, http_connector: HyperHttpConnector<R>) -> Connector
799 where
800 R: Clone + Send + Sync + 'static,
801 R: tower::Service<hyper_util::client::legacy::connect::dns::Name>,
802 R::Response: Iterator<Item = std::net::SocketAddr>,
803 R::Future: Send,
804 R::Error: Into<Box<dyn Error + Send + Sync>>,
805 {
806 match &self.tls.provider {
807 #[cfg(feature = "__rustls")]
809 tls::Provider::Rustls(crypto_mode) => {
810 let proxy_config = self.proxy_config.clone()
811 .unwrap_or_else(proxy::ProxyConfig::disabled);
812
813 let https_connector = tls::rustls_provider::build_connector::wrap_connector(
814 http_connector,
815 crypto_mode.clone(),
816 &self.tls.context,
817 proxy_config,
818 );
819 self.wrap_connector(https_connector)
820 },
821 #[cfg(feature = "s2n-tls")]
822 tls::Provider::S2nTls => {
823 let proxy_config = self.proxy_config.clone()
824 .unwrap_or_else(proxy::ProxyConfig::disabled);
825
826 let https_connector = tls::s2n_tls_provider::build_connector::wrap_connector(
827 http_connector,
828 &self.tls.context,
829 proxy_config,
830 );
831 self.wrap_connector(https_connector)
832 }
833 }
834 }
835 }
836
837 impl Builder<TlsProviderSelected> {
838 pub fn build_https(self) -> SharedHttpClient {
843 build_with_conn_fn(
844 self.client_builder,
845 self.pool_idle_timeout,
846 move |client_builder, settings, runtime_components| {
847 let builder = new_conn_builder(client_builder, settings, runtime_components)
848 .tls_provider(self.tls_provider.provider.clone())
849 .tls_context(self.tls_provider.context.clone());
850 builder.build()
851 },
852 )
853 }
854
855 pub fn build_with_resolver(
857 self,
858 resolver: impl ResolveDns + Clone + 'static,
859 ) -> SharedHttpClient {
860 build_with_conn_fn(
861 self.client_builder,
862 self.pool_idle_timeout,
863 move |client_builder, settings, runtime_components| {
864 let builder = new_conn_builder(client_builder, settings, runtime_components)
865 .tls_provider(self.tls_provider.provider.clone())
866 .tls_context(self.tls_provider.context.clone());
867 builder.build_with_resolver(resolver.clone())
868 },
869 )
870 }
871
872 pub fn tls_context(mut self, ctx: TlsContext) -> Self {
874 self.tls_provider.context = ctx;
875 self
876 }
877 }
878}
879
880impl<Any> Builder<Any> {
881 pub fn pool_idle_timeout<D>(mut self, val: D) -> Self
905 where
906 D: Into<Option<Duration>>,
907 {
908 self.pool_idle_timeout = Some(val.into());
909 self
910 }
911
912 pub fn set_pool_idle_timeout(&mut self, val: Option<Option<Duration>>) -> &mut Self {
935 self.pool_idle_timeout = val;
936 self
937 }
938}
939
940impl Builder<TlsUnset> {
941 pub fn new() -> Self {
943 Self::default()
944 }
945
946 #[doc(hidden)]
948 pub fn build_with_connector_fn<F>(self, connector_fn: F) -> SharedHttpClient
949 where
950 F: Fn(Option<&HttpConnectorSettings>, Option<&RuntimeComponents>) -> Connector
951 + Send
952 + Sync
953 + 'static,
954 {
955 build_with_conn_fn(
956 self.client_builder,
957 self.pool_idle_timeout,
958 move |_builder, settings, runtime_components| {
959 connector_fn(settings, runtime_components)
960 },
961 )
962 }
963
964 #[doc(hidden)]
966 pub fn build_http(self) -> SharedHttpClient {
967 build_with_conn_fn(
968 self.client_builder,
969 self.pool_idle_timeout,
970 move |client_builder, settings, runtime_components| {
971 let builder = new_conn_builder(client_builder, settings, runtime_components);
972 builder.build_http()
973 },
974 )
975 }
976
977 pub fn tls_provider(self, provider: tls::Provider) -> Builder<TlsProviderSelected> {
979 Builder {
980 client_builder: self.client_builder,
981 pool_idle_timeout: self.pool_idle_timeout,
982 tls_provider: TlsProviderSelected {
983 provider,
984 context: TlsContext::default(),
985 },
986 }
987 }
988}
989
990pub(crate) fn build_with_conn_fn<F>(
991 client_builder: Option<hyper_util::client::legacy::Builder>,
992 pool_idle_timeout: Option<Option<Duration>>,
993 connector_fn: F,
994) -> SharedHttpClient
995where
996 F: Fn(
997 hyper_util::client::legacy::Builder,
998 Option<&HttpConnectorSettings>,
999 Option<&RuntimeComponents>,
1000 ) -> Connector
1001 + Send
1002 + Sync
1003 + 'static,
1004{
1005 let client_builder =
1006 client_builder.unwrap_or_else(|| new_tokio_hyper_builder(pool_idle_timeout));
1007 SharedHttpClient::new(HyperClient {
1008 connector_cache: RwLock::new(HashMap::new()),
1009 client_builder,
1010 connector_fn,
1011 })
1012}
1013
1014#[allow(dead_code)]
1015pub(crate) fn build_with_tcp_conn_fn<C, F>(
1016 client_builder: Option<hyper_util::client::legacy::Builder>,
1017 pool_idle_timeout: Option<Option<Duration>>,
1018 tcp_connector_fn: F,
1019) -> SharedHttpClient
1020where
1021 F: Fn() -> C + Send + Sync + 'static,
1022 C: Clone + Send + Sync + 'static,
1023 C: tower::Service<Uri>,
1024 C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static,
1025 C::Future: Unpin + Send + 'static,
1026 C::Error: Into<BoxError>,
1027 C: Connect,
1028{
1029 build_with_conn_fn(
1030 client_builder,
1031 pool_idle_timeout,
1032 move |client_builder, settings, runtime_components| {
1033 let builder = new_conn_builder(client_builder, settings, runtime_components);
1034 builder.wrap_connector(tcp_connector_fn())
1035 },
1036 )
1037}
1038
1039fn new_conn_builder(
1040 client_builder: hyper_util::client::legacy::Builder,
1041 settings: Option<&HttpConnectorSettings>,
1042 runtime_components: Option<&RuntimeComponents>,
1043) -> ConnectorBuilder {
1044 let mut builder = Connector::builder().hyper_builder(client_builder);
1045 builder.set_connector_settings(settings.cloned());
1046 if let Some(components) = runtime_components {
1047 builder.set_sleep_impl(components.sleep_impl());
1048 }
1049 builder
1050}
1051
1052#[cfg(test)]
1053mod test {
1054 use std::io::{Error, ErrorKind};
1055 use std::pin::Pin;
1056 use std::sync::atomic::{AtomicU32, Ordering};
1057 use std::sync::Arc;
1058 use std::task::{Context, Poll};
1059
1060 use crate::client::timeout::test::NeverConnects;
1061 use aws_smithy_async::assert_elapsed;
1062 use aws_smithy_async::rt::sleep::TokioSleep;
1063 use aws_smithy_async::time::SystemTimeSource;
1064 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1065 use http_1x::Uri;
1066 use hyper::rt::ReadBufCursor;
1067 use hyper_util::client::legacy::connect::Connected;
1068
1069 use super::*;
1070
1071 #[tokio::test]
1072 async fn connector_selection() {
1073 let creation_count = Arc::new(AtomicU32::new(0));
1075 let http_client = build_with_tcp_conn_fn(None, None, {
1076 let count = creation_count.clone();
1077 move || {
1078 count.fetch_add(1, Ordering::Relaxed);
1079 NeverConnects
1080 }
1081 });
1082
1083 let settings = [
1085 HttpConnectorSettings::builder()
1086 .connect_timeout(Duration::from_secs(3))
1087 .build(),
1088 HttpConnectorSettings::builder()
1089 .read_timeout(Duration::from_secs(3))
1090 .build(),
1091 HttpConnectorSettings::builder()
1092 .connect_timeout(Duration::from_secs(3))
1093 .read_timeout(Duration::from_secs(3))
1094 .build(),
1095 HttpConnectorSettings::builder()
1096 .connect_timeout(Duration::from_secs(5))
1097 .read_timeout(Duration::from_secs(3))
1098 .build(),
1099 ];
1100
1101 let components = RuntimeComponentsBuilder::for_tests()
1103 .with_time_source(Some(SystemTimeSource::new()))
1104 .build()
1105 .unwrap();
1106 let mut handles = Vec::new();
1107 for setting in &settings {
1108 for _ in 0..1000 {
1109 let client = http_client.clone();
1110 handles.push(tokio::spawn({
1111 let setting = setting.clone();
1112 let components = components.clone();
1113 async move {
1114 let _ = client.http_connector(&setting, &components);
1115 }
1116 }));
1117 }
1118 }
1119 for handle in handles {
1120 handle.await.unwrap();
1121 }
1122
1123 assert_eq!(4, creation_count.load(Ordering::Relaxed));
1125 }
1126
1127 #[tokio::test]
1128 async fn hyper_io_error() {
1129 let connector = TestConnection {
1130 inner: HangupStream,
1131 };
1132 let adapter = Connector::builder().wrap_connector(connector).adapter;
1133 let err = adapter
1134 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1135 .await
1136 .expect_err("socket hangup");
1137 assert!(err.is_io(), "unexpected error type: {:?}", err);
1138 }
1139
1140 #[derive(Clone)]
1142 struct HangupStream;
1143
1144 impl Connection for HangupStream {
1145 fn connected(&self) -> Connected {
1146 Connected::new()
1147 }
1148 }
1149
1150 impl Read for HangupStream {
1151 fn poll_read(
1152 self: Pin<&mut Self>,
1153 _cx: &mut Context<'_>,
1154 _buf: ReadBufCursor<'_>,
1155 ) -> Poll<std::io::Result<()>> {
1156 Poll::Ready(Err(Error::new(
1157 ErrorKind::ConnectionReset,
1158 "connection reset",
1159 )))
1160 }
1161 }
1162
1163 impl Write for HangupStream {
1164 fn poll_write(
1165 self: Pin<&mut Self>,
1166 _cx: &mut Context<'_>,
1167 _buf: &[u8],
1168 ) -> Poll<Result<usize, Error>> {
1169 Poll::Pending
1170 }
1171
1172 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1173 Poll::Pending
1174 }
1175
1176 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1177 Poll::Pending
1178 }
1179 }
1180
1181 #[derive(Clone)]
1182 struct TestConnection<T> {
1183 inner: T,
1184 }
1185
1186 impl<T> tower::Service<Uri> for TestConnection<T>
1187 where
1188 T: Clone + Connection,
1189 {
1190 type Response = T;
1191 type Error = BoxError;
1192 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1193
1194 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1195 Poll::Ready(Ok(()))
1196 }
1197
1198 fn call(&mut self, _req: Uri) -> Self::Future {
1199 std::future::ready(Ok(self.inner.clone()))
1200 }
1201 }
1202
1203 #[tokio::test]
1204 async fn http_connect_timeout_works() {
1205 let tcp_connector = NeverConnects::default();
1206 let connector_settings = HttpConnectorSettings::builder()
1207 .connect_timeout(Duration::from_secs(1))
1208 .build();
1209 let hyper = Connector::builder()
1210 .connector_settings(connector_settings)
1211 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1212 .wrap_connector(tcp_connector)
1213 .adapter;
1214 let now = tokio::time::Instant::now();
1215 tokio::time::pause();
1216 let resp = hyper
1217 .call(HttpRequest::get("https://static-uri.com").unwrap())
1218 .await
1219 .unwrap_err();
1220 assert!(
1221 resp.is_timeout(),
1222 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
1223 resp
1224 );
1225 let message = DisplayErrorContext(&resp).to_string();
1226 let expected = "timeout: client error (Connect): HTTP connect timeout occurred after 1s";
1227 assert!(
1228 message.contains(expected),
1229 "expected '{message}' to contain '{expected}'"
1230 );
1231 assert_elapsed!(now, Duration::from_secs(1));
1232 }
1233
1234 #[tokio::test]
1235 async fn http_read_timeout_works() {
1236 let tcp_connector = crate::client::timeout::test::NeverReplies;
1237 let connector_settings = HttpConnectorSettings::builder()
1238 .connect_timeout(Duration::from_secs(1))
1239 .read_timeout(Duration::from_secs(2))
1240 .build();
1241 let hyper = Connector::builder()
1242 .connector_settings(connector_settings)
1243 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1244 .wrap_connector(tcp_connector)
1245 .adapter;
1246 let now = tokio::time::Instant::now();
1247 tokio::time::pause();
1248 let err = hyper
1249 .call(HttpRequest::get("https://fake-uri.com").unwrap())
1250 .await
1251 .unwrap_err();
1252 assert!(
1253 err.is_timeout(),
1254 "expected err.is_timeout() to be true but it was false, err == {err:?}",
1255 );
1256 let message = format!("{}", DisplayErrorContext(&err));
1257 let expected = "timeout: HTTP read timeout occurred after 2s";
1258 assert!(
1259 message.contains(expected),
1260 "expected '{message}' to contain '{expected}'"
1261 );
1262 assert_elapsed!(now, Duration::from_secs(2));
1263 }
1264
1265 #[cfg(not(windows))]
1266 #[tokio::test]
1267 async fn connection_refused_works() {
1268 use crate::client::dns::HyperUtilResolver;
1269 use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns};
1270 use std::net::{IpAddr, Ipv4Addr};
1271
1272 #[derive(Debug, Clone, Default)]
1273 struct TestResolver;
1274 impl ResolveDns for TestResolver {
1275 fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> {
1276 let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
1277 DnsFuture::ready(Ok(vec![localhost_v4]))
1278 }
1279 }
1280
1281 let connector_settings = HttpConnectorSettings::builder()
1282 .connect_timeout(Duration::from_secs(20))
1283 .build();
1284
1285 let resolver = HyperUtilResolver {
1286 resolver: TestResolver,
1287 };
1288 let connector = Connector::builder().base_connector_with_resolver(resolver);
1289
1290 let hyper = Connector::builder()
1291 .connector_settings(connector_settings)
1292 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
1293 .wrap_connector(connector)
1294 .adapter;
1295
1296 let resp = hyper
1297 .call(HttpRequest::get("http://static-uri:50227.com").unwrap())
1298 .await
1299 .unwrap_err();
1300 assert!(
1301 resp.is_io(),
1302 "expected resp.is_io() to be true but it was false, resp == {:?}",
1303 resp
1304 );
1305 let message = DisplayErrorContext(&resp).to_string();
1306 let expected = "Connection refused";
1307 assert!(
1308 message.contains(expected),
1309 "expected '{message}' to contain '{expected}'"
1310 );
1311 }
1312
1313 #[cfg(feature = "s2n-tls")]
1314 #[tokio::test]
1315 async fn s2n_tls_provider() {
1316 let client = Builder::new()
1318 .tls_provider(tls::Provider::S2nTls)
1319 .build_https();
1320 let connector_settings = HttpConnectorSettings::builder().build();
1321
1322 let runtime_components = RuntimeComponentsBuilder::for_tests()
1325 .with_time_source(Some(SystemTimeSource::new()))
1326 .build()
1327 .unwrap();
1328
1329 let connector = client.http_connector(&connector_settings, &runtime_components);
1330
1331 let error = connector
1336 .call(HttpRequest::get("notascheme://amazon.com").unwrap())
1337 .await
1338 .unwrap_err();
1339 let error = error.into_source();
1340 let s2n_error = error
1341 .source()
1342 .unwrap()
1343 .downcast_ref::<s2n_tls_hyper::error::Error>()
1344 .unwrap();
1345 assert!(matches!(
1346 s2n_error,
1347 s2n_tls_hyper::error::Error::InvalidScheme
1348 ));
1349 }
1350}