1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls", unix, target_os = "windows"))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "__native-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "__native-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38 Simple(ConnectorService),
40 WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46 type Response = Conn;
47 type Error = BoxError;
48 type Future = Connecting;
49
50 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 match self {
52 Connector::Simple(service) => service.poll_ready(cx),
53 Connector::WithLayers(service) => service.poll_ready(cx),
54 }
55 }
56
57 fn call(&mut self, dst: Uri) -> Self::Future {
58 match self {
59 Connector::Simple(service) => service.call(dst),
60 Connector::WithLayers(service) => service.call(Unnameable(dst)),
61 }
62 }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68 BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71 inner: Inner,
72 proxies: Arc<Vec<ProxyMatcher>>,
73 verbose: verbose::Wrapper,
74 timeout: Option<Duration>,
75 #[cfg(feature = "__tls")]
76 nodelay: bool,
77 #[cfg(feature = "__tls")]
78 tls_info: bool,
79 #[cfg(feature = "__tls")]
80 user_agent: Option<HeaderValue>,
81 #[cfg(feature = "socks")]
82 resolver: Option<DynResolver>,
83 #[cfg(unix)]
84 unix_socket: Option<Arc<std::path::Path>>,
85 #[cfg(target_os = "windows")]
86 windows_named_pipe: Option<Arc<std::ffi::OsStr>>,
87}
88
89impl ConnectorBuilder {
90 pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
91where {
92 let mut base_service = ConnectorService {
94 inner: self.inner,
95 proxies: self.proxies,
96 verbose: self.verbose,
97 #[cfg(feature = "__tls")]
98 nodelay: self.nodelay,
99 #[cfg(feature = "__tls")]
100 tls_info: self.tls_info,
101 #[cfg(feature = "__tls")]
102 user_agent: self.user_agent,
103 simple_timeout: None,
104 #[cfg(feature = "socks")]
105 resolver: self.resolver.unwrap_or_else(DynResolver::gai),
106 #[cfg(unix)]
107 unix_socket: self.unix_socket,
108 #[cfg(target_os = "windows")]
109 windows_named_pipe: self.windows_named_pipe,
110 };
111
112 #[cfg(unix)]
113 if base_service.unix_socket.is_some() && !base_service.proxies.is_empty() {
114 base_service.proxies = Default::default();
115 log::trace!("unix_socket() set, proxies are ignored");
116 }
117 #[cfg(target_os = "windows")]
118 if base_service.windows_named_pipe.is_some() && !base_service.proxies.is_empty() {
119 base_service.proxies = Default::default();
120 log::trace!("windows_named_pipe() set, proxies are ignored");
121 }
122
123 if layers.is_empty() {
124 base_service.simple_timeout = self.timeout;
126 return Connector::Simple(base_service);
127 }
128
129 let unnameable_service = ServiceBuilder::new()
133 .layer(MapRequestLayer::new(|request: Unnameable| request.0))
134 .service(base_service);
135 let mut service = BoxCloneSyncService::new(unnameable_service);
136
137 for layer in layers {
138 service = ServiceBuilder::new().layer(layer).service(service);
139 }
140
141 match self.timeout {
145 Some(timeout) => {
146 let service = ServiceBuilder::new()
147 .layer(TimeoutLayer::new(timeout))
148 .service(service);
149 let service = ServiceBuilder::new()
150 .map_err(|error: BoxError| cast_to_internal_error(error))
151 .service(service);
152 let service = BoxCloneSyncService::new(service);
153
154 Connector::WithLayers(service)
155 }
156 None => {
157 let service = ServiceBuilder::new().service(service);
161 let service = ServiceBuilder::new()
162 .map_err(|error: BoxError| cast_to_internal_error(error))
163 .service(service);
164 let service = BoxCloneSyncService::new(service);
165 Connector::WithLayers(service)
166 }
167 }
168 }
169
170 #[cfg(not(feature = "__tls"))]
171 pub(crate) fn new<T>(
172 mut http: HttpConnector,
173 proxies: Arc<Vec<ProxyMatcher>>,
174 local_addr: T,
175 #[cfg(any(
176 target_os = "android",
177 target_os = "fuchsia",
178 target_os = "illumos",
179 target_os = "ios",
180 target_os = "linux",
181 target_os = "macos",
182 target_os = "solaris",
183 target_os = "tvos",
184 target_os = "visionos",
185 target_os = "watchos",
186 ))]
187 interface: Option<&str>,
188 nodelay: bool,
189 ) -> ConnectorBuilder
190 where
191 T: Into<Option<IpAddr>>,
192 {
193 http.set_local_address(local_addr.into());
194 #[cfg(any(
195 target_os = "android",
196 target_os = "fuchsia",
197 target_os = "illumos",
198 target_os = "ios",
199 target_os = "linux",
200 target_os = "macos",
201 target_os = "solaris",
202 target_os = "tvos",
203 target_os = "visionos",
204 target_os = "watchos",
205 ))]
206 if let Some(interface) = interface {
207 http.set_interface(interface.to_owned());
208 }
209 http.set_nodelay(nodelay);
210
211 ConnectorBuilder {
212 inner: Inner::Http(http),
213 proxies,
214 verbose: verbose::OFF,
215 timeout: None,
216 #[cfg(feature = "socks")]
217 resolver: None,
218 #[cfg(unix)]
219 unix_socket: None,
220 #[cfg(target_os = "windows")]
221 windows_named_pipe: None,
222 }
223 }
224
225 #[cfg(feature = "__native-tls")]
226 pub(crate) fn new_native_tls<T>(
227 http: HttpConnector,
228 tls: TlsConnectorBuilder,
229 proxies: Arc<Vec<ProxyMatcher>>,
230 user_agent: Option<HeaderValue>,
231 local_addr: T,
232 #[cfg(any(
233 target_os = "android",
234 target_os = "fuchsia",
235 target_os = "illumos",
236 target_os = "ios",
237 target_os = "linux",
238 target_os = "macos",
239 target_os = "solaris",
240 target_os = "tvos",
241 target_os = "visionos",
242 target_os = "watchos",
243 ))]
244 interface: Option<&str>,
245 nodelay: bool,
246 tls_info: bool,
247 ) -> crate::Result<ConnectorBuilder>
248 where
249 T: Into<Option<IpAddr>>,
250 {
251 let tls = tls.build().map_err(crate::error::builder)?;
252 Ok(Self::from_built_native_tls(
253 http,
254 tls,
255 proxies,
256 user_agent,
257 local_addr,
258 #[cfg(any(
259 target_os = "android",
260 target_os = "fuchsia",
261 target_os = "illumos",
262 target_os = "ios",
263 target_os = "linux",
264 target_os = "macos",
265 target_os = "solaris",
266 target_os = "tvos",
267 target_os = "visionos",
268 target_os = "watchos",
269 ))]
270 interface,
271 nodelay,
272 tls_info,
273 ))
274 }
275
276 #[cfg(feature = "__native-tls")]
277 pub(crate) fn from_built_native_tls<T>(
278 mut http: HttpConnector,
279 tls: TlsConnector,
280 proxies: Arc<Vec<ProxyMatcher>>,
281 user_agent: Option<HeaderValue>,
282 local_addr: T,
283 #[cfg(any(
284 target_os = "android",
285 target_os = "fuchsia",
286 target_os = "illumos",
287 target_os = "ios",
288 target_os = "linux",
289 target_os = "macos",
290 target_os = "solaris",
291 target_os = "tvos",
292 target_os = "visionos",
293 target_os = "watchos",
294 ))]
295 interface: Option<&str>,
296 nodelay: bool,
297 tls_info: bool,
298 ) -> ConnectorBuilder
299 where
300 T: Into<Option<IpAddr>>,
301 {
302 http.set_local_address(local_addr.into());
303 #[cfg(any(
304 target_os = "android",
305 target_os = "fuchsia",
306 target_os = "illumos",
307 target_os = "ios",
308 target_os = "linux",
309 target_os = "macos",
310 target_os = "solaris",
311 target_os = "tvos",
312 target_os = "visionos",
313 target_os = "watchos",
314 ))]
315 if let Some(interface) = interface {
316 http.set_interface(interface);
317 }
318 http.set_nodelay(nodelay);
319 http.enforce_http(false);
320
321 ConnectorBuilder {
322 inner: Inner::NativeTls(http, tls),
323 proxies,
324 verbose: verbose::OFF,
325 nodelay,
326 tls_info,
327 user_agent,
328 timeout: None,
329 #[cfg(feature = "socks")]
330 resolver: None,
331 #[cfg(unix)]
332 unix_socket: None,
333 #[cfg(target_os = "windows")]
334 windows_named_pipe: None,
335 }
336 }
337
338 #[cfg(feature = "__rustls")]
339 #[allow(clippy::too_many_arguments)]
340 pub(crate) fn new_rustls_tls<T>(
341 mut http: HttpConnector,
342 tls: rustls::ClientConfig,
343 proxies: Arc<Vec<ProxyMatcher>>,
344 user_agent: Option<HeaderValue>,
345 local_addr: T,
346 #[cfg(any(
347 target_os = "android",
348 target_os = "fuchsia",
349 target_os = "illumos",
350 target_os = "ios",
351 target_os = "linux",
352 target_os = "macos",
353 target_os = "solaris",
354 target_os = "tvos",
355 target_os = "visionos",
356 target_os = "watchos",
357 ))]
358 interface: Option<&str>,
359 nodelay: bool,
360 tls_info: bool,
361 ) -> ConnectorBuilder
362 where
363 T: Into<Option<IpAddr>>,
364 {
365 http.set_local_address(local_addr.into());
366 #[cfg(any(
367 target_os = "android",
368 target_os = "fuchsia",
369 target_os = "illumos",
370 target_os = "ios",
371 target_os = "linux",
372 target_os = "macos",
373 target_os = "solaris",
374 target_os = "tvos",
375 target_os = "visionos",
376 target_os = "watchos",
377 ))]
378 if let Some(interface) = interface {
379 http.set_interface(interface.to_owned());
380 }
381 http.set_nodelay(nodelay);
382 http.enforce_http(false);
383
384 let (tls, tls_proxy) = if proxies.is_empty() {
385 let tls = Arc::new(tls);
386 (tls.clone(), tls)
387 } else {
388 let mut tls_proxy = tls.clone();
389 tls_proxy.alpn_protocols.clear();
390 (Arc::new(tls), Arc::new(tls_proxy))
391 };
392
393 ConnectorBuilder {
394 inner: Inner::RustlsTls {
395 http,
396 tls,
397 tls_proxy,
398 },
399 proxies,
400 verbose: verbose::OFF,
401 nodelay,
402 tls_info,
403 user_agent,
404 timeout: None,
405 #[cfg(feature = "socks")]
406 resolver: None,
407 #[cfg(unix)]
408 unix_socket: None,
409 #[cfg(target_os = "windows")]
410 windows_named_pipe: None,
411 }
412 }
413
414 pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
415 self.timeout = timeout;
416 }
417
418 pub(crate) fn set_verbose(&mut self, enabled: bool) {
419 self.verbose.0 = enabled;
420 }
421
422 pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
423 match &mut self.inner {
424 #[cfg(feature = "__native-tls")]
425 Inner::NativeTls(http, _tls) => http.set_keepalive(dur),
426 #[cfg(feature = "__rustls")]
427 Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
428 #[cfg(not(feature = "__tls"))]
429 Inner::Http(http) => http.set_keepalive(dur),
430 }
431 }
432
433 pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
434 match &mut self.inner {
435 #[cfg(feature = "__native-tls")]
436 Inner::NativeTls(http, _tls) => http.set_keepalive_interval(dur),
437 #[cfg(feature = "__rustls")]
438 Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
439 #[cfg(not(feature = "__tls"))]
440 Inner::Http(http) => http.set_keepalive_interval(dur),
441 }
442 }
443
444 pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
445 match &mut self.inner {
446 #[cfg(feature = "__native-tls")]
447 Inner::NativeTls(http, _tls) => http.set_keepalive_retries(retries),
448 #[cfg(feature = "__rustls")]
449 Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
450 #[cfg(not(feature = "__tls"))]
451 Inner::Http(http) => http.set_keepalive_retries(retries),
452 }
453 }
454
455 #[cfg(feature = "socks")]
456 pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
457 self.resolver = Some(resolver);
458 }
459
460 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
461 pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
462 match &mut self.inner {
463 #[cfg(feature = "__native-tls")]
464 Inner::NativeTls(http, _tls) => http.set_tcp_user_timeout(dur),
465 #[cfg(feature = "__rustls")]
466 Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
467 #[cfg(not(feature = "__tls"))]
468 Inner::Http(http) => http.set_tcp_user_timeout(dur),
469 }
470 }
471
472 #[cfg(unix)]
473 pub(crate) fn set_unix_socket(&mut self, path: Option<Arc<std::path::Path>>) {
474 self.unix_socket = path;
475 }
476
477 #[cfg(target_os = "windows")]
478 pub(crate) fn set_windows_named_pipe(&mut self, pipe: Option<Arc<std::ffi::OsStr>>) {
479 self.windows_named_pipe = pipe;
480 }
481}
482
483#[allow(missing_debug_implementations)]
484#[derive(Clone)]
485pub(crate) struct ConnectorService {
486 inner: Inner,
487 proxies: Arc<Vec<ProxyMatcher>>,
488 verbose: verbose::Wrapper,
489 simple_timeout: Option<Duration>,
494 #[cfg(feature = "__tls")]
495 nodelay: bool,
496 #[cfg(feature = "__tls")]
497 tls_info: bool,
498 #[cfg(feature = "__tls")]
499 user_agent: Option<HeaderValue>,
500 #[cfg(feature = "socks")]
501 resolver: DynResolver,
502 #[cfg(unix)]
504 unix_socket: Option<Arc<std::path::Path>>,
505 #[cfg(target_os = "windows")]
506 windows_named_pipe: Option<Arc<std::ffi::OsStr>>,
507}
508
509#[derive(Clone)]
510enum Inner {
511 #[cfg(not(feature = "__tls"))]
512 Http(HttpConnector),
513 #[cfg(feature = "__native-tls")]
514 NativeTls(HttpConnector, TlsConnector),
515 #[cfg(feature = "__rustls")]
516 RustlsTls {
517 http: HttpConnector,
518 tls: Arc<rustls::ClientConfig>,
519 tls_proxy: Arc<rustls::ClientConfig>,
520 },
521}
522
523impl Inner {
524 #[cfg(feature = "socks")]
525 fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
526 match self {
527 #[cfg(feature = "__native-tls")]
528 Inner::NativeTls(http, _) => http,
529 #[cfg(feature = "__rustls")]
530 Inner::RustlsTls { http, .. } => http,
531 #[cfg(not(feature = "__tls"))]
532 Inner::Http(http) => http,
533 }
534 }
535}
536
537impl ConnectorService {
538 #[cfg(feature = "socks")]
539 async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
540 let dns = match proxy.uri().scheme_str() {
541 Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
542 Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
543 _ => {
544 unreachable!("connect_socks is only called for socks proxies");
545 }
546 };
547
548 match &mut self.inner {
549 #[cfg(feature = "__native-tls")]
550 Inner::NativeTls(http, tls) => {
551 if dst.scheme() == Some(&Scheme::HTTPS) {
552 let host = dst.host().ok_or("no host in url")?.to_string();
553 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
554 let conn = TokioIo::new(conn);
555 let conn = TokioIo::new(conn);
556 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
557 let io = tls_connector.connect(&host, conn).await?;
558 let io = TokioIo::new(io);
559 return Ok(Conn {
560 inner: self.verbose.wrap(NativeTlsConn { inner: io }),
561 is_proxy: false,
562 tls_info: self.tls_info,
563 });
564 }
565 }
566 #[cfg(feature = "__rustls")]
567 Inner::RustlsTls { http, tls, .. } => {
568 if dst.scheme() == Some(&Scheme::HTTPS) {
569 use std::convert::TryFrom;
570 use tokio_rustls::TlsConnector as RustlsConnector;
571
572 let tls = tls.clone();
573 let host = dst.host().ok_or("no host in url")?.to_string();
574 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
575 let conn = TokioIo::new(conn);
576 let conn = TokioIo::new(conn);
577 let server_name =
578 rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
579 .map_err(|_| "Invalid Server Name")?;
580 let io = RustlsConnector::from(tls)
581 .connect(server_name, conn)
582 .await?;
583 let io = TokioIo::new(io);
584 return Ok(Conn {
585 inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
586 is_proxy: false,
587 tls_info: self.tls_info,
588 });
589 }
590 }
591 #[cfg(not(feature = "__tls"))]
592 Inner::Http(http) => {
593 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
594 return Ok(Conn {
595 inner: self.verbose.wrap(TokioIo::new(conn)),
596 is_proxy: false,
597 tls_info: false,
598 });
599 }
600 }
601
602 let resolver = &self.resolver;
603 let http = self.inner.get_http_connector();
604 socks::connect(proxy, dst, dns, resolver, http)
605 .await
606 .map(|tcp| Conn {
607 inner: self.verbose.wrap(TokioIo::new(tcp)),
608 is_proxy: false,
609 tls_info: false,
610 })
611 .map_err(Into::into)
612 }
613
614 async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
615 match self.inner {
616 #[cfg(not(feature = "__tls"))]
617 Inner::Http(mut http) => {
618 let io = http.call(dst).await?;
619 Ok(Conn {
620 inner: self.verbose.wrap(io),
621 is_proxy,
622 tls_info: false,
623 })
624 }
625 #[cfg(feature = "__native-tls")]
626 Inner::NativeTls(http, tls) => {
627 let mut http = http.clone();
628
629 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
633 http.set_nodelay(true);
634 }
635
636 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
637 let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
638 let io = http.call(dst).await?;
639
640 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
641 if !self.nodelay {
642 stream
643 .inner()
644 .get_ref()
645 .get_ref()
646 .get_ref()
647 .inner()
648 .inner()
649 .set_nodelay(false)?;
650 }
651 Ok(Conn {
652 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
653 is_proxy,
654 tls_info: self.tls_info,
655 })
656 } else {
657 Ok(Conn {
658 inner: self.verbose.wrap(io),
659 is_proxy,
660 tls_info: false,
661 })
662 }
663 }
664 #[cfg(feature = "__rustls")]
665 Inner::RustlsTls { http, tls, .. } => {
666 let mut http = http.clone();
667
668 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
672 http.set_nodelay(true);
673 }
674
675 let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
676 let io = http.call(dst).await?;
677
678 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
679 if !self.nodelay {
680 let (io, _) = stream.inner().get_ref();
681 io.inner().inner().set_nodelay(false)?;
682 }
683 Ok(Conn {
684 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
685 is_proxy,
686 tls_info: self.tls_info,
687 })
688 } else {
689 Ok(Conn {
690 inner: self.verbose.wrap(io),
691 is_proxy,
692 tls_info: false,
693 })
694 }
695 }
696 }
697 }
698
699 #[cfg(any(unix, target_os = "windows"))]
701 async fn connect_local_transport(self, dst: Uri) -> Result<Conn, BoxError> {
702 #[cfg(unix)]
703 let svc = {
704 let path = self
705 .unix_socket
706 .as_ref()
707 .expect("connect local must have socket path")
708 .clone();
709 tower::service_fn(move |_| {
710 let fut = tokio::net::UnixStream::connect(path.clone());
711 async move {
712 let io = fut.await?;
713 Ok::<_, std::io::Error>(TokioIo::new(io))
714 }
715 })
716 };
717 #[cfg(target_os = "windows")]
718 let svc = {
719 use tokio::net::windows::named_pipe::ClientOptions;
720 let pipe = self
721 .windows_named_pipe
722 .as_ref()
723 .expect("connect local must have pipe path")
724 .clone();
725 tower::service_fn(move |_| {
726 let pipe = pipe.clone();
727 async move { ClientOptions::new().open(pipe).map(TokioIo::new) }
728 })
729 };
730 let is_proxy = false;
731 match self.inner {
732 #[cfg(not(feature = "__tls"))]
733 Inner::Http(..) => {
734 let mut svc = svc;
735 let io = svc.call(dst).await?;
736 Ok(Conn {
737 inner: self.verbose.wrap(io),
738 is_proxy,
739 tls_info: false,
740 })
741 }
742 #[cfg(feature = "__native-tls")]
743 Inner::NativeTls(_, tls) => {
744 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
745 let mut http = hyper_tls::HttpsConnector::from((svc, tls_connector));
746 let io = http.call(dst).await?;
747
748 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
749 Ok(Conn {
750 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
751 is_proxy,
752 tls_info: self.tls_info,
753 })
754 } else {
755 Ok(Conn {
756 inner: self.verbose.wrap(io),
757 is_proxy,
758 tls_info: false,
759 })
760 }
761 }
762 #[cfg(feature = "__rustls")]
763 Inner::RustlsTls { tls, .. } => {
764 let mut http = hyper_rustls::HttpsConnector::from((svc, tls.clone()));
765 let io = http.call(dst).await?;
766
767 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
768 Ok(Conn {
769 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
770 is_proxy,
771 tls_info: self.tls_info,
772 })
773 } else {
774 Ok(Conn {
775 inner: self.verbose.wrap(io),
776 is_proxy,
777 tls_info: false,
778 })
779 }
780 }
781 }
782 }
783
784 async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
785 log::debug!("proxy({proxy:?}) intercepts '{:?}'", dst.host());
786
787 #[cfg(feature = "socks")]
788 match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
789 "socks4" | "socks4a" | "socks5" | "socks5h" => {
790 return self.connect_socks(dst, proxy).await
791 }
792 _ => (),
793 }
794
795 let proxy_dst = proxy.uri().clone();
796 #[cfg(feature = "__tls")]
797 let auth = proxy.basic_auth().cloned();
798
799 #[cfg(feature = "__tls")]
800 let misc = proxy.custom_headers();
801
802 match &self.inner {
803 #[cfg(feature = "__native-tls")]
804 Inner::NativeTls(http, tls) => {
805 if dst.scheme() == Some(&Scheme::HTTPS) {
806 log::trace!("tunneling HTTPS over proxy");
807 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
808 let inner =
809 hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
810 let mut tunnel =
812 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
813 if let Some(auth) = auth {
814 tunnel = tunnel.with_auth(auth);
815 }
816 if let Some(ua) = self.user_agent {
817 let mut headers = http::HeaderMap::new();
818 headers.insert(http::header::USER_AGENT, ua);
819 tunnel = tunnel.with_headers(headers);
820 }
821 if let Some(custom_headers) = misc {
823 tunnel = tunnel.with_headers(custom_headers.clone());
824 }
825 let tunneled = tunnel.call(dst.clone()).await?;
828 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
829 let io = tls_connector
830 .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
831 .await?;
832 return Ok(Conn {
833 inner: self.verbose.wrap(NativeTlsConn {
834 inner: TokioIo::new(io),
835 }),
836 is_proxy: false,
837 tls_info: self.tls_info,
838 });
839 }
840 }
841 #[cfg(feature = "__rustls")]
842 Inner::RustlsTls {
843 http,
844 tls,
845 tls_proxy,
846 } => {
847 if dst.scheme() == Some(&Scheme::HTTPS) {
848 use rustls_pki_types::ServerName;
849 use std::convert::TryFrom;
850 use tokio_rustls::TlsConnector as RustlsConnector;
851
852 log::trace!("tunneling HTTPS over proxy");
853 let http = http.clone();
854 let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
855 let mut tunnel =
857 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
858 if let Some(auth) = auth {
859 tunnel = tunnel.with_auth(auth);
860 }
861 if let Some(custom_headers) = misc {
862 tunnel = tunnel.with_headers(custom_headers.clone());
863 }
864 if let Some(ua) = self.user_agent {
865 let mut headers = http::HeaderMap::new();
866 headers.insert(http::header::USER_AGENT, ua);
867 tunnel = tunnel.with_headers(headers);
868 }
869 let tunneled = tunnel.call(dst.clone()).await?;
872 let host = dst.host().ok_or("no host in url")?.to_string();
873 let server_name = ServerName::try_from(host.as_str().to_owned())
874 .map_err(|_| "Invalid Server Name")?;
875 let io = RustlsConnector::from(tls.clone())
876 .connect(server_name, TokioIo::new(tunneled))
877 .await?;
878
879 return Ok(Conn {
880 inner: self.verbose.wrap(RustlsTlsConn {
881 inner: TokioIo::new(io),
882 }),
883 is_proxy: false,
884 tls_info: self.tls_info,
885 });
886 }
887 }
888 #[cfg(not(feature = "__tls"))]
889 Inner::Http(_) => (),
890 }
891
892 self.connect_with_maybe_proxy(proxy_dst, true).await
893 }
894
895 #[cfg(any(unix, target_os = "windows"))]
896 fn should_use_local_transport(&self) -> bool {
897 #[cfg(unix)]
898 return self.unix_socket.is_some();
899
900 #[cfg(target_os = "windows")]
901 return self.windows_named_pipe.is_some();
902 }
903}
904
905async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
906where
907 F: Future<Output = Result<T, BoxError>>,
908{
909 if let Some(to) = timeout {
910 match tokio::time::timeout(to, f).await {
911 Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
912 Ok(Ok(try_res)) => Ok(try_res),
913 Ok(Err(e)) => Err(e),
914 }
915 } else {
916 f.await
917 }
918}
919
920impl Service<Uri> for ConnectorService {
921 type Response = Conn;
922 type Error = BoxError;
923 type Future = Connecting;
924
925 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
926 Poll::Ready(Ok(()))
927 }
928
929 fn call(&mut self, dst: Uri) -> Self::Future {
930 log::debug!("starting new connection '{:?}'", dst.host());
931 let timeout = self.simple_timeout;
932
933 #[cfg(any(unix, target_os = "windows"))]
935 if self.should_use_local_transport() {
936 return Box::pin(with_timeout(
937 self.clone().connect_local_transport(dst),
938 timeout,
939 ));
940 }
941
942 for prox in self.proxies.iter() {
943 if let Some(intercepted) = prox.intercept(&dst) {
944 return Box::pin(with_timeout(
945 self.clone().connect_via_proxy(dst, intercepted),
946 timeout,
947 ));
948 }
949 }
950
951 Box::pin(with_timeout(
952 self.clone().connect_with_maybe_proxy(dst, false),
953 timeout,
954 ))
955 }
956}
957
958#[cfg(feature = "__tls")]
959trait TlsInfoFactory {
960 fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
961}
962
963#[cfg(feature = "__tls")]
964impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
965 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
966 self.inner().tls_info()
967 }
968}
969
970#[cfg(feature = "__tls")]
973impl TlsInfoFactory for tokio::net::TcpStream {
974 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
975 None
976 }
977}
978
979#[cfg(feature = "__native-tls")]
980impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
981 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
982 let peer_certificate = self
983 .get_ref()
984 .peer_certificate()
985 .ok()
986 .flatten()
987 .and_then(|c| c.to_der().ok());
988 Some(crate::tls::TlsInfo { peer_certificate })
989 }
990}
991
992#[cfg(feature = "__native-tls")]
993impl TlsInfoFactory
994 for tokio_native_tls::TlsStream<
995 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
996 >
997{
998 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
999 let peer_certificate = self
1000 .get_ref()
1001 .peer_certificate()
1002 .ok()
1003 .flatten()
1004 .and_then(|c| c.to_der().ok());
1005 Some(crate::tls::TlsInfo { peer_certificate })
1006 }
1007}
1008
1009#[cfg(feature = "__native-tls")]
1010impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1011 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1012 match self {
1013 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1014 hyper_tls::MaybeHttpsStream::Http(_) => None,
1015 }
1016 }
1017}
1018
1019#[cfg(feature = "__rustls")]
1020impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
1021 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1022 let peer_certificate = self
1023 .get_ref()
1024 .1
1025 .peer_certificates()
1026 .and_then(|certs| certs.first())
1027 .map(|c| c.to_vec());
1028 Some(crate::tls::TlsInfo { peer_certificate })
1029 }
1030}
1031
1032#[cfg(feature = "__rustls")]
1033impl TlsInfoFactory
1034 for tokio_rustls::client::TlsStream<
1035 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
1036 >
1037{
1038 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1039 let peer_certificate = self
1040 .get_ref()
1041 .1
1042 .peer_certificates()
1043 .and_then(|certs| certs.first())
1044 .map(|c| c.to_vec());
1045 Some(crate::tls::TlsInfo { peer_certificate })
1046 }
1047}
1048
1049#[cfg(feature = "__rustls")]
1050impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1051 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1052 match self {
1053 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1054 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1055 }
1056 }
1057}
1058
1059#[cfg(feature = "__tls")]
1062impl TlsInfoFactory for Box<dyn AsyncConnWithInfo> {
1063 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1064 (**self).tls_info()
1065 }
1066}
1067
1068#[cfg(feature = "__tls")]
1071#[cfg(unix)]
1072impl TlsInfoFactory for tokio::net::UnixStream {
1073 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1074 None
1075 }
1076}
1077
1078#[cfg(feature = "__native-tls")]
1079#[cfg(unix)]
1080impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1081 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1082 let peer_certificate = self
1083 .get_ref()
1084 .peer_certificate()
1085 .ok()
1086 .flatten()
1087 .and_then(|c| c.to_der().ok());
1088 Some(crate::tls::TlsInfo { peer_certificate })
1089 }
1090}
1091
1092#[cfg(feature = "__native-tls")]
1093#[cfg(unix)]
1094impl TlsInfoFactory
1095 for tokio_native_tls::TlsStream<
1096 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1097 >
1098{
1099 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1100 let peer_certificate = self
1101 .get_ref()
1102 .peer_certificate()
1103 .ok()
1104 .flatten()
1105 .and_then(|c| c.to_der().ok());
1106 Some(crate::tls::TlsInfo { peer_certificate })
1107 }
1108}
1109
1110#[cfg(feature = "__native-tls")]
1111#[cfg(unix)]
1112impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1113 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1114 match self {
1115 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1116 hyper_tls::MaybeHttpsStream::Http(_) => None,
1117 }
1118 }
1119}
1120
1121#[cfg(feature = "__rustls")]
1122#[cfg(unix)]
1123impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1124 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1125 let peer_certificate = self
1126 .get_ref()
1127 .1
1128 .peer_certificates()
1129 .and_then(|certs| certs.first())
1130 .map(|c| c.to_vec());
1131 Some(crate::tls::TlsInfo { peer_certificate })
1132 }
1133}
1134
1135#[cfg(feature = "__rustls")]
1136#[cfg(unix)]
1137impl TlsInfoFactory
1138 for tokio_rustls::client::TlsStream<
1139 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1140 >
1141{
1142 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1143 let peer_certificate = self
1144 .get_ref()
1145 .1
1146 .peer_certificates()
1147 .and_then(|certs| certs.first())
1148 .map(|c| c.to_vec());
1149 Some(crate::tls::TlsInfo { peer_certificate })
1150 }
1151}
1152
1153#[cfg(feature = "__rustls")]
1154#[cfg(unix)]
1155impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1156 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1157 match self {
1158 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1159 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1160 }
1161 }
1162}
1163
1164#[cfg(feature = "__tls")]
1167#[cfg(target_os = "windows")]
1168impl TlsInfoFactory for tokio::net::windows::named_pipe::NamedPipeClient {
1169 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1170 None
1171 }
1172}
1173
1174#[cfg(feature = "__native-tls")]
1175#[cfg(target_os = "windows")]
1176impl TlsInfoFactory
1177 for tokio_native_tls::TlsStream<
1178 TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1179 >
1180{
1181 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1182 let peer_certificate = self
1183 .get_ref()
1184 .peer_certificate()
1185 .ok()
1186 .flatten()
1187 .and_then(|c| c.to_der().ok());
1188 Some(crate::tls::TlsInfo { peer_certificate })
1189 }
1190}
1191
1192#[cfg(feature = "__native-tls")]
1193#[cfg(target_os = "windows")]
1194impl TlsInfoFactory
1195 for tokio_native_tls::TlsStream<
1196 TokioIo<
1197 hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1198 >,
1199 >
1200{
1201 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1202 let peer_certificate = self
1203 .get_ref()
1204 .peer_certificate()
1205 .ok()
1206 .flatten()
1207 .and_then(|c| c.to_der().ok());
1208 Some(crate::tls::TlsInfo { peer_certificate })
1209 }
1210}
1211
1212#[cfg(feature = "__native-tls")]
1213#[cfg(target_os = "windows")]
1214impl TlsInfoFactory
1215 for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>
1216{
1217 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1218 match self {
1219 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1220 hyper_tls::MaybeHttpsStream::Http(_) => None,
1221 }
1222 }
1223}
1224
1225#[cfg(feature = "__rustls")]
1226#[cfg(target_os = "windows")]
1227impl TlsInfoFactory
1228 for tokio_rustls::client::TlsStream<
1229 TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1230 >
1231{
1232 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1233 let peer_certificate = self
1234 .get_ref()
1235 .1
1236 .peer_certificates()
1237 .and_then(|certs| certs.first())
1238 .map(|c| c.to_vec());
1239 Some(crate::tls::TlsInfo { peer_certificate })
1240 }
1241}
1242
1243#[cfg(feature = "__rustls")]
1244#[cfg(target_os = "windows")]
1245impl TlsInfoFactory
1246 for tokio_rustls::client::TlsStream<
1247 TokioIo<
1248 hyper_rustls::MaybeHttpsStream<
1249 TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>,
1250 >,
1251 >,
1252 >
1253{
1254 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1255 let peer_certificate = self
1256 .get_ref()
1257 .1
1258 .peer_certificates()
1259 .and_then(|certs| certs.first())
1260 .map(|c| c.to_vec());
1261 Some(crate::tls::TlsInfo { peer_certificate })
1262 }
1263}
1264
1265#[cfg(feature = "__rustls")]
1266#[cfg(target_os = "windows")]
1267impl TlsInfoFactory
1268 for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>
1269{
1270 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1271 match self {
1272 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1273 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1274 }
1275 }
1276}
1277
1278pub(crate) trait AsyncConn:
1279 Read + Write + Connection + Send + Sync + Unpin + 'static
1280{
1281}
1282
1283impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
1284
1285#[cfg(feature = "__tls")]
1286trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
1287#[cfg(not(feature = "__tls"))]
1288trait AsyncConnWithInfo: AsyncConn {}
1289
1290#[cfg(feature = "__tls")]
1291impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
1292#[cfg(not(feature = "__tls"))]
1293impl<T: AsyncConn> AsyncConnWithInfo for T {}
1294
1295type BoxConn = Box<dyn AsyncConnWithInfo>;
1296
1297pub(crate) mod sealed {
1298 use super::*;
1299 #[derive(Debug)]
1300 pub struct Unnameable(pub(super) Uri);
1301
1302 pin_project! {
1303 #[allow(missing_debug_implementations)]
1308 pub struct Conn {
1309 #[pin]
1310 pub(super)inner: BoxConn,
1311 pub(super) is_proxy: bool,
1312 pub(super) tls_info: bool,
1314 }
1315 }
1316
1317 impl Connection for Conn {
1318 fn connected(&self) -> Connected {
1319 let connected = self.inner.connected().proxy(self.is_proxy);
1320 #[cfg(feature = "__tls")]
1321 if self.tls_info {
1322 let tls_info = self.inner.tls_info();
1323 if let Some(tls_info) = tls_info {
1324 connected.extra(tls_info)
1325 } else {
1326 connected
1327 }
1328 } else {
1329 connected
1330 }
1331 #[cfg(not(feature = "__tls"))]
1332 connected
1333 }
1334 }
1335
1336 impl Read for Conn {
1337 fn poll_read(
1338 self: Pin<&mut Self>,
1339 cx: &mut Context,
1340 buf: ReadBufCursor<'_>,
1341 ) -> Poll<io::Result<()>> {
1342 let this = self.project();
1343 Read::poll_read(this.inner, cx, buf)
1344 }
1345 }
1346
1347 impl Write for Conn {
1348 fn poll_write(
1349 self: Pin<&mut Self>,
1350 cx: &mut Context,
1351 buf: &[u8],
1352 ) -> Poll<Result<usize, io::Error>> {
1353 let this = self.project();
1354 Write::poll_write(this.inner, cx, buf)
1355 }
1356
1357 fn poll_write_vectored(
1358 self: Pin<&mut Self>,
1359 cx: &mut Context<'_>,
1360 bufs: &[IoSlice<'_>],
1361 ) -> Poll<Result<usize, io::Error>> {
1362 let this = self.project();
1363 Write::poll_write_vectored(this.inner, cx, bufs)
1364 }
1365
1366 fn is_write_vectored(&self) -> bool {
1367 self.inner.is_write_vectored()
1368 }
1369
1370 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1371 let this = self.project();
1372 Write::poll_flush(this.inner, cx)
1373 }
1374
1375 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1376 let this = self.project();
1377 Write::poll_shutdown(this.inner, cx)
1378 }
1379 }
1380}
1381
1382#[cfg(unix)]
1384pub(crate) mod uds {
1385 use std::path::Path;
1386
1387 #[cfg(unix)]
1394 pub trait UnixSocketProvider {
1395 #[doc(hidden)]
1396 fn reqwest_uds_path(&self, _: Internal) -> &Path;
1397 }
1398
1399 #[allow(missing_debug_implementations)]
1400 pub struct Internal;
1401
1402 macro_rules! as_path {
1403 ($($t:ty,)+) => {
1404 $(
1405 impl UnixSocketProvider for $t {
1406 #[doc(hidden)]
1407 fn reqwest_uds_path(&self, _: Internal) -> &Path {
1408 self.as_ref()
1409 }
1410 }
1411 )+
1412 }
1413 }
1414
1415 as_path![
1416 String,
1417 &'_ str,
1418 &'_ Path,
1419 std::path::PathBuf,
1420 std::sync::Arc<Path>,
1421 ];
1422}
1423
1424#[cfg(target_os = "windows")]
1426pub(crate) mod windows_named_pipe {
1427 use std::ffi::OsStr;
1428 #[cfg(target_os = "windows")]
1433 pub trait WindowsNamedPipeProvider {
1434 #[doc(hidden)]
1435 fn reqwest_windows_named_pipe_path(&self, _: Internal) -> &OsStr;
1436 }
1437
1438 #[allow(missing_debug_implementations)]
1439 pub struct Internal;
1440
1441 macro_rules! as_os_str {
1442 ($($t:ty,)+) => {
1443 $(
1444 impl WindowsNamedPipeProvider for $t {
1445 #[doc(hidden)]
1446 fn reqwest_windows_named_pipe_path(&self, _: Internal) -> &OsStr {
1447 self.as_ref()
1448 }
1449 }
1450 )+
1451 }
1452 }
1453
1454 as_os_str![
1455 String,
1456 &'_ str,
1457 std::path::PathBuf,
1458 &'_ std::path::Path,
1459 std::ffi::OsString,
1460 &'_ OsStr,
1461 ];
1462}
1463
1464pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1465
1466#[cfg(feature = "__native-tls")]
1467mod native_tls_conn {
1468 use super::TlsInfoFactory;
1469 use hyper::rt::{Read, ReadBufCursor, Write};
1470 use hyper_tls::MaybeHttpsStream;
1471 use hyper_util::client::legacy::connect::{Connected, Connection};
1472 use hyper_util::rt::TokioIo;
1473 use pin_project_lite::pin_project;
1474 use std::{
1475 io::{self, IoSlice},
1476 pin::Pin,
1477 task::{Context, Poll},
1478 };
1479 use tokio::io::{AsyncRead, AsyncWrite};
1480 use tokio::net::TcpStream;
1481 use tokio_native_tls::TlsStream;
1482
1483 pin_project! {
1484 pub(super) struct NativeTlsConn<T> {
1485 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1486 }
1487 }
1488
1489 impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1490 fn connected(&self) -> Connected {
1491 let connected = self
1492 .inner
1493 .inner()
1494 .get_ref()
1495 .get_ref()
1496 .get_ref()
1497 .inner()
1498 .connected();
1499 #[cfg(feature = "__native-tls-alpn")]
1500 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1501 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1502 _ => connected,
1503 }
1504 #[cfg(not(feature = "__native-tls-alpn"))]
1505 connected
1506 }
1507 }
1508
1509 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1510 fn connected(&self) -> Connected {
1511 let connected = self
1512 .inner
1513 .inner()
1514 .get_ref()
1515 .get_ref()
1516 .get_ref()
1517 .inner()
1518 .connected();
1519 #[cfg(feature = "__native-tls-alpn")]
1520 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1521 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1522 _ => connected,
1523 }
1524 #[cfg(not(feature = "__native-tls-alpn"))]
1525 connected
1526 }
1527 }
1528
1529 #[cfg(unix)]
1530 impl Connection for NativeTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1531 fn connected(&self) -> Connected {
1532 let connected = Connected::new();
1533 #[cfg(feature = "__native-tls-alpn")]
1534 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1535 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1536 _ => connected,
1537 }
1538 #[cfg(not(feature = "__native-tls-alpn"))]
1539 connected
1540 }
1541 }
1542
1543 #[cfg(unix)]
1544 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1545 fn connected(&self) -> Connected {
1546 let connected = Connected::new();
1547 #[cfg(feature = "__native-tls-alpn")]
1548 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1549 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1550 _ => connected,
1551 }
1552 #[cfg(not(feature = "__native-tls-alpn"))]
1553 connected
1554 }
1555 }
1556
1557 #[cfg(target_os = "windows")]
1558 impl Connection
1559 for NativeTlsConn<TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>
1560 {
1561 fn connected(&self) -> Connected {
1562 let connected = Connected::new();
1563 #[cfg(feature = "__native-tls-alpn")]
1564 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1565 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1566 _ => connected,
1567 }
1568 #[cfg(not(feature = "__native-tls-alpn"))]
1569 connected
1570 }
1571 }
1572
1573 #[cfg(target_os = "windows")]
1574 impl Connection
1575 for NativeTlsConn<
1576 TokioIo<MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>,
1577 >
1578 {
1579 fn connected(&self) -> Connected {
1580 let connected = Connected::new();
1581 #[cfg(feature = "__native-tls-alpn")]
1582 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1583 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1584 _ => connected,
1585 }
1586 #[cfg(not(feature = "__native-tls-alpn"))]
1587 connected
1588 }
1589 }
1590
1591 impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1592 fn poll_read(
1593 self: Pin<&mut Self>,
1594 cx: &mut Context,
1595 buf: ReadBufCursor<'_>,
1596 ) -> Poll<tokio::io::Result<()>> {
1597 let this = self.project();
1598 Read::poll_read(this.inner, cx, buf)
1599 }
1600 }
1601
1602 impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1603 fn poll_write(
1604 self: Pin<&mut Self>,
1605 cx: &mut Context,
1606 buf: &[u8],
1607 ) -> Poll<Result<usize, tokio::io::Error>> {
1608 let this = self.project();
1609 Write::poll_write(this.inner, cx, buf)
1610 }
1611
1612 fn poll_write_vectored(
1613 self: Pin<&mut Self>,
1614 cx: &mut Context<'_>,
1615 bufs: &[IoSlice<'_>],
1616 ) -> Poll<Result<usize, io::Error>> {
1617 let this = self.project();
1618 Write::poll_write_vectored(this.inner, cx, bufs)
1619 }
1620
1621 fn is_write_vectored(&self) -> bool {
1622 self.inner.is_write_vectored()
1623 }
1624
1625 fn poll_flush(
1626 self: Pin<&mut Self>,
1627 cx: &mut Context,
1628 ) -> Poll<Result<(), tokio::io::Error>> {
1629 let this = self.project();
1630 Write::poll_flush(this.inner, cx)
1631 }
1632
1633 fn poll_shutdown(
1634 self: Pin<&mut Self>,
1635 cx: &mut Context,
1636 ) -> Poll<Result<(), tokio::io::Error>> {
1637 let this = self.project();
1638 Write::poll_shutdown(this.inner, cx)
1639 }
1640 }
1641
1642 impl<T> TlsInfoFactory for NativeTlsConn<T>
1643 where
1644 TokioIo<TlsStream<T>>: TlsInfoFactory,
1645 {
1646 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1647 self.inner.tls_info()
1648 }
1649 }
1650}
1651
1652#[cfg(feature = "__rustls")]
1653mod rustls_tls_conn {
1654 use super::TlsInfoFactory;
1655 use hyper::rt::{Read, ReadBufCursor, Write};
1656 use hyper_rustls::MaybeHttpsStream;
1657 use hyper_util::client::legacy::connect::{Connected, Connection};
1658 use hyper_util::rt::TokioIo;
1659 use pin_project_lite::pin_project;
1660 use std::{
1661 io::{self, IoSlice},
1662 pin::Pin,
1663 task::{Context, Poll},
1664 };
1665 use tokio::io::{AsyncRead, AsyncWrite};
1666 use tokio::net::TcpStream;
1667 use tokio_rustls::client::TlsStream;
1668
1669 pin_project! {
1670 pub(super) struct RustlsTlsConn<T> {
1671 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1672 }
1673 }
1674
1675 impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1676 fn connected(&self) -> Connected {
1677 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1678 self.inner
1679 .inner()
1680 .get_ref()
1681 .0
1682 .inner()
1683 .connected()
1684 .negotiated_h2()
1685 } else {
1686 self.inner.inner().get_ref().0.inner().connected()
1687 }
1688 }
1689 }
1690 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1691 fn connected(&self) -> Connected {
1692 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1693 self.inner
1694 .inner()
1695 .get_ref()
1696 .0
1697 .inner()
1698 .connected()
1699 .negotiated_h2()
1700 } else {
1701 self.inner.inner().get_ref().0.inner().connected()
1702 }
1703 }
1704 }
1705
1706 #[cfg(unix)]
1707 impl Connection for RustlsTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1708 fn connected(&self) -> Connected {
1709 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1710 self.inner
1711 .inner()
1712 .get_ref()
1713 .0
1714 .inner()
1715 .connected()
1716 .negotiated_h2()
1717 } else {
1718 self.inner.inner().get_ref().0.inner().connected()
1719 }
1720 }
1721 }
1722
1723 #[cfg(unix)]
1724 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1725 fn connected(&self) -> Connected {
1726 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1727 self.inner
1728 .inner()
1729 .get_ref()
1730 .0
1731 .inner()
1732 .connected()
1733 .negotiated_h2()
1734 } else {
1735 self.inner.inner().get_ref().0.inner().connected()
1736 }
1737 }
1738 }
1739
1740 #[cfg(target_os = "windows")]
1741 impl Connection
1742 for RustlsTlsConn<TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>
1743 {
1744 fn connected(&self) -> Connected {
1745 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1746 self.inner
1747 .inner()
1748 .get_ref()
1749 .0
1750 .inner()
1751 .connected()
1752 .negotiated_h2()
1753 } else {
1754 self.inner.inner().get_ref().0.inner().connected()
1755 }
1756 }
1757 }
1758
1759 #[cfg(target_os = "windows")]
1760 impl Connection
1761 for RustlsTlsConn<
1762 TokioIo<MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>,
1763 >
1764 {
1765 fn connected(&self) -> Connected {
1766 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1767 self.inner
1768 .inner()
1769 .get_ref()
1770 .0
1771 .inner()
1772 .connected()
1773 .negotiated_h2()
1774 } else {
1775 self.inner.inner().get_ref().0.inner().connected()
1776 }
1777 }
1778 }
1779
1780 impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1781 fn poll_read(
1782 self: Pin<&mut Self>,
1783 cx: &mut Context,
1784 buf: ReadBufCursor<'_>,
1785 ) -> Poll<tokio::io::Result<()>> {
1786 let this = self.project();
1787 Read::poll_read(this.inner, cx, buf)
1788 }
1789 }
1790
1791 impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1792 fn poll_write(
1793 self: Pin<&mut Self>,
1794 cx: &mut Context,
1795 buf: &[u8],
1796 ) -> Poll<Result<usize, tokio::io::Error>> {
1797 let this = self.project();
1798 Write::poll_write(this.inner, cx, buf)
1799 }
1800
1801 fn poll_write_vectored(
1802 self: Pin<&mut Self>,
1803 cx: &mut Context<'_>,
1804 bufs: &[IoSlice<'_>],
1805 ) -> Poll<Result<usize, io::Error>> {
1806 let this = self.project();
1807 Write::poll_write_vectored(this.inner, cx, bufs)
1808 }
1809
1810 fn is_write_vectored(&self) -> bool {
1811 self.inner.is_write_vectored()
1812 }
1813
1814 fn poll_flush(
1815 self: Pin<&mut Self>,
1816 cx: &mut Context,
1817 ) -> Poll<Result<(), tokio::io::Error>> {
1818 let this = self.project();
1819 Write::poll_flush(this.inner, cx)
1820 }
1821
1822 fn poll_shutdown(
1823 self: Pin<&mut Self>,
1824 cx: &mut Context,
1825 ) -> Poll<Result<(), tokio::io::Error>> {
1826 let this = self.project();
1827 Write::poll_shutdown(this.inner, cx)
1828 }
1829 }
1830 impl<T> TlsInfoFactory for RustlsTlsConn<T>
1831 where
1832 TokioIo<TlsStream<T>>: TlsInfoFactory,
1833 {
1834 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1835 self.inner.tls_info()
1836 }
1837 }
1838}
1839
1840#[cfg(feature = "socks")]
1841mod socks {
1842 use tower_service::Service;
1843
1844 use http::uri::Scheme;
1845 use http::Uri;
1846 use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1847 use tokio::net::TcpStream;
1848
1849 use super::BoxError;
1850 use crate::proxy::Intercepted;
1851
1852 pub(super) enum DnsResolve {
1853 Local,
1854 Proxy,
1855 }
1856
1857 #[derive(Debug)]
1858 #[allow(clippy::enum_variant_names)]
1859 pub(super) enum SocksProxyError {
1860 SocksNoHostInUrl,
1861 SocksLocalResolve(BoxError),
1862 SocksConnect(BoxError),
1863 }
1864
1865 pub(super) async fn connect(
1866 proxy: Intercepted,
1867 dst: Uri,
1868 dns_mode: DnsResolve,
1869 resolver: &crate::dns::DynResolver,
1870 http_connector: &mut crate::connect::HttpConnector,
1871 ) -> Result<TcpStream, SocksProxyError> {
1872 let https = dst.scheme() == Some(&Scheme::HTTPS);
1873 let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1874 let mut host = original_host.to_owned();
1875 let port = match dst.port() {
1876 Some(p) => p.as_u16(),
1877 None if https => 443u16,
1878 _ => 80u16,
1879 };
1880
1881 if let DnsResolve::Local = dns_mode {
1882 let maybe_new_target = resolver
1883 .http_resolve(&dst)
1884 .await
1885 .map_err(SocksProxyError::SocksLocalResolve)?
1886 .next();
1887 if let Some(new_target) = maybe_new_target {
1888 log::trace!("socks local dns resolved {new_target:?}");
1889 let ip = new_target.ip();
1891 if ip.is_ipv6() {
1892 host = format!("[{}]", ip);
1893 } else {
1894 host = ip.to_string();
1895 }
1896 }
1897 }
1898
1899 let proxy_uri = proxy.uri().clone();
1900 let dst_uri = format!(
1902 "{}://{}:{}",
1903 if https { "https" } else { "http" },
1904 host,
1905 port
1906 )
1907 .parse::<Uri>()
1908 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1909
1910 match proxy.uri().scheme_str() {
1912 Some("socks4") | Some("socks4a") => {
1913 let mut svc = SocksV4::new(proxy_uri, http_connector);
1914 let stream = Service::call(&mut svc, dst_uri)
1915 .await
1916 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1917 Ok(stream.into_inner())
1918 }
1919 Some("socks5") | Some("socks5h") => {
1920 let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1921 SocksV5::new(proxy_uri, http_connector)
1922 .with_auth(username.to_string(), password.to_string())
1923 } else {
1924 SocksV5::new(proxy_uri, http_connector)
1925 };
1926 let stream = Service::call(&mut svc, dst_uri)
1927 .await
1928 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1929 Ok(stream.into_inner())
1930 }
1931 _ => unreachable!(),
1932 }
1933 }
1934
1935 impl std::fmt::Display for SocksProxyError {
1936 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1937 match self {
1938 Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1939 Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1940 Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1941 }
1942 }
1943 }
1944
1945 impl std::error::Error for SocksProxyError {
1946 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1947 match self {
1948 Self::SocksNoHostInUrl => None,
1949 Self::SocksLocalResolve(ref e) => Some(&**e),
1950 Self::SocksConnect(ref e) => Some(&**e),
1951 }
1952 }
1953 }
1954}
1955
1956mod verbose {
1957 use crate::util::Escape;
1958 use hyper::rt::{Read, ReadBufCursor, Write};
1959 use hyper_util::client::legacy::connect::{Connected, Connection};
1960 use std::cmp::min;
1961 use std::fmt;
1962 use std::io::{self, IoSlice};
1963 use std::pin::Pin;
1964 use std::task::{Context, Poll};
1965
1966 pub(super) const OFF: Wrapper = Wrapper(false);
1967
1968 #[derive(Clone, Copy)]
1969 pub(super) struct Wrapper(pub(super) bool);
1970
1971 impl Wrapper {
1972 pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1973 if self.0 && log::log_enabled!(log::Level::Trace) {
1974 Box::new(Verbose {
1975 id: crate::util::fast_random() as u32,
1977 inner: conn,
1978 })
1979 } else {
1980 Box::new(conn)
1981 }
1982 }
1983 }
1984
1985 struct Verbose<T> {
1986 id: u32,
1987 inner: T,
1988 }
1989
1990 impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1991 fn connected(&self) -> Connected {
1992 self.inner.connected()
1993 }
1994 }
1995
1996 impl<T: Read + Write + Unpin> Read for Verbose<T> {
1997 fn poll_read(
1998 mut self: Pin<&mut Self>,
1999 cx: &mut Context,
2000 mut buf: ReadBufCursor<'_>,
2001 ) -> Poll<std::io::Result<()>> {
2002 let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
2006 match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
2007 Poll::Ready(Ok(())) => {
2008 log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
2009 let len = vbuf.filled().len();
2010 unsafe {
2013 buf.advance(len);
2014 }
2015 Poll::Ready(Ok(()))
2016 }
2017 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2018 Poll::Pending => Poll::Pending,
2019 }
2020 }
2021 }
2022
2023 impl<T: Read + Write + Unpin> Write for Verbose<T> {
2024 fn poll_write(
2025 mut self: Pin<&mut Self>,
2026 cx: &mut Context,
2027 buf: &[u8],
2028 ) -> Poll<Result<usize, std::io::Error>> {
2029 match Pin::new(&mut self.inner).poll_write(cx, buf) {
2030 Poll::Ready(Ok(n)) => {
2031 log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
2032 Poll::Ready(Ok(n))
2033 }
2034 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2035 Poll::Pending => Poll::Pending,
2036 }
2037 }
2038
2039 fn poll_write_vectored(
2040 mut self: Pin<&mut Self>,
2041 cx: &mut Context<'_>,
2042 bufs: &[IoSlice<'_>],
2043 ) -> Poll<Result<usize, io::Error>> {
2044 match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
2045 Poll::Ready(Ok(nwritten)) => {
2046 log::trace!(
2047 "{:08x} write (vectored): {:?}",
2048 self.id,
2049 Vectored { bufs, nwritten }
2050 );
2051 Poll::Ready(Ok(nwritten))
2052 }
2053 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2054 Poll::Pending => Poll::Pending,
2055 }
2056 }
2057
2058 fn is_write_vectored(&self) -> bool {
2059 self.inner.is_write_vectored()
2060 }
2061
2062 fn poll_flush(
2063 mut self: Pin<&mut Self>,
2064 cx: &mut Context,
2065 ) -> Poll<Result<(), std::io::Error>> {
2066 Pin::new(&mut self.inner).poll_flush(cx)
2067 }
2068
2069 fn poll_shutdown(
2070 mut self: Pin<&mut Self>,
2071 cx: &mut Context,
2072 ) -> Poll<Result<(), std::io::Error>> {
2073 Pin::new(&mut self.inner).poll_shutdown(cx)
2074 }
2075 }
2076
2077 #[cfg(feature = "__tls")]
2078 impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
2079 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
2080 self.inner.tls_info()
2081 }
2082 }
2083
2084 struct Vectored<'a, 'b> {
2085 bufs: &'a [IoSlice<'b>],
2086 nwritten: usize,
2087 }
2088
2089 impl fmt::Debug for Vectored<'_, '_> {
2090 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2091 let mut left = self.nwritten;
2092 for buf in self.bufs.iter() {
2093 if left == 0 {
2094 break;
2095 }
2096 let n = min(left, buf.len());
2097 Escape::new(&buf[..n]).fmt(f)?;
2098 left -= n;
2099 }
2100 Ok(())
2101 }
2102 }
2103}