1use std::{
2 sync::{
3 Arc,
4 },
5 time::{
6 Duration,
7 },
8 net::{
9 IpAddr,
10 Ipv4Addr,
11 Ipv6Addr,
12 },
13};
14
15use http::{
16 uri::{
17 Uri,
18 },
19};
20
21use hyper_util::{
22 client::{
23 legacy,
24 },
25};
26
27use tokio::{
28 net::{
29 TcpStream,
30 },
31};
32
33
34use crate::{
35 resolver,
36 Io,
37 Protocols,
38 IoStream,
39 IoKind,
40 IoTcp,
41 IoTcpTls,
42 TokioIo,
43};
44
45#[derive(Debug)]
46pub enum Error {
47 UriMissingScheme {
48 uri: Uri,
49 },
50 UriMissingHost {
51 uri: Uri,
52 },
53 UriUnsupportedHttpsScheme {
54 uri: Uri,
55 scheme: http::uri::Scheme,
56 },
57 ResolverBuild(hickory_resolver::error::ResolveError),
58 TlsNonEmptyAlpnProtocols,
59 TlsNativeCertsLoad(std::io::Error),
60 TlsNativeCertAdd(rustls::Error),
61 TlsInvalidDnsName {
62 hostname: String,
63 error: rustls::pki_types::InvalidDnsNameError,
64 },
65 Connection(Box<dyn std::error::Error + Send + Sync + 'static>),
66 ConnectionToSocks5(Box<dyn std::error::Error + Send + Sync + 'static>),
67 ConnectionViaSocks5(async_socks5::Error),
68 ConnectionTls(std::io::Error),
69}
70
71pub struct ResolverBuilder {
72 resolver_kind: resolver::ResolverKind,
73}
74
75impl ResolverBuilder {
76 pub(super) fn new() -> Self {
77 Self {
78 resolver_kind: resolver::ResolverKind::System,
79 }
80 }
81
82 pub fn system(mut self) -> Self {
87 self.resolver_kind = resolver::ResolverKind::System;
88 self
89 }
90
91 pub fn google(mut self) -> Self {
95 self.resolver_kind = resolver::ResolverKind::Google;
96 self
97 }
98
99 pub fn google_tls(mut self) -> Self {
104 self.resolver_kind = resolver::ResolverKind::GoogleTls;
105 self
106 }
107
108 pub fn google_https(mut self) -> Self {
113 self.resolver_kind = resolver::ResolverKind::GoogleHttps;
114 self
115 }
116
117 pub fn connection_setup(self, uri: Uri) -> Result<ConnectionBuilder, Error> {
119 let uri_scheme = uri.scheme()
120 .ok_or_else(|| Error::UriMissingScheme { uri: uri.clone(), })?
121 .clone();
122 let uri_host = uri.host()
123 .ok_or_else(|| Error::UriMissingHost { uri: uri.clone(), })?
124 .to_string();
125 let resolver = resolver::HickoryResolver::new(self.resolver_kind)
126 .map_err(Error::ResolverBuild)?;
127 Ok(ConnectionBuilder::new(resolver, uri, uri_host, uri_scheme))
128 }
129}
130
131pub struct ConnectionBuilder {
132 uri: Uri,
133 uri_host: String,
134 uri_scheme: http::uri::Scheme,
135 resolver: resolver::HickoryResolver,
136 http_connector_params: HttpConnectorParams,
137}
138
139#[derive(Default)]
140struct HttpConnectorParams {
141 keepalive_time: Option<Option<Duration>>,
142 keepalive_interval_interval: Option<Option<Duration>>,
143 keepalive_retries_retries: Option<Option<u32>>,
144 nodelay_nodelay: Option<bool>,
145 send_buffer_size_size: Option<Option<usize>>,
146 recv_buffer_size_size: Option<Option<usize>>,
147 local_address_addr: Option<Option<IpAddr>>,
148 local_addresses_addrs: Option<(Ipv4Addr, Ipv6Addr)>,
149 connect_timeout_dur: Option<Option<Duration>>,
150 happy_eyeballs_timeout_dur: Option<Option<Duration>>,
151 reuse_address_reuse_address: Option<bool>,
152 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
153 interface_interface: Option<String>,
154}
155
156impl HttpConnectorParams {
157 fn apply_resolver(
158 &self,
159 resolver: &mut resolver::HickoryResolver,
160 )
161 {
162 if let Some(maybe_ip) = &self.local_address_addr {
163 if let Some(ip) = maybe_ip {
164 if ip.is_ipv4() {
165 resolver.force_ip4();
166 } else if ip.is_ipv6() {
167 resolver.force_ip6();
168 }
169 } else {
170 resolver.force_none();
171 }
172 }
173 }
174
175 fn apply(
176 self,
177 http_connector: &mut legacy::connect::HttpConnector<resolver::HickoryResolver>,
178 )
179 {
180 if let Some(value) = self.keepalive_time {
181 http_connector.set_keepalive(value);
182 }
183 if let Some(value) = self.keepalive_interval_interval {
184 http_connector.set_keepalive_interval(value);
185 }
186 if let Some(value) = self.keepalive_retries_retries {
187 http_connector.set_keepalive_retries(value);
188 }
189 if let Some(value) = self.nodelay_nodelay {
190 http_connector.set_nodelay(value);
191 }
192 if let Some(value) = self.send_buffer_size_size {
193 http_connector.set_send_buffer_size(value);
194 }
195 if let Some(value) = self.recv_buffer_size_size {
196 http_connector.set_recv_buffer_size(value);
197 }
198 if let Some(maybe_ip) = self.local_address_addr {
199 http_connector.set_local_address(maybe_ip);
200 }
201 if let Some((ip4, ip6)) = self.local_addresses_addrs {
202 http_connector.set_local_addresses(ip4, ip6);
203 }
204 if let Some(value) = self.connect_timeout_dur {
205 http_connector.set_connect_timeout(value);
206 }
207 if let Some(value) = self.happy_eyeballs_timeout_dur {
208 http_connector.set_happy_eyeballs_timeout(value);
209 }
210 if let Some(value) = self.reuse_address_reuse_address {
211 http_connector.set_reuse_address(value);
212 }
213 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
214 if let Some(value) = self.interface_interface {
215 http_connector.set_interface(value);
216 }
217 }
218}
219
220impl ConnectionBuilder {
221 fn new(
222 resolver: resolver::HickoryResolver,
223 uri: Uri,
224 uri_host: String,
225 uri_scheme: http::uri::Scheme,
226 )
227 -> Self
228 {
229 Self {
230 uri,
231 uri_host,
232 uri_scheme,
233 resolver,
234 http_connector_params: HttpConnectorParams::default(),
235 }
236 }
237
238 pub fn keepalive(mut self, time: Option<Duration>) -> Self {
245 self.http_connector_params.keepalive_time = Some(time);
246 self
247 }
248
249 pub fn keepalive_interval(mut self, interval: Option<Duration>) -> Self {
252 self.http_connector_params.keepalive_interval_interval = Some(interval);
253 self
254 }
255
256 pub fn keepalive_retries(mut self, retries: Option<u32>) -> Self {
258 self.http_connector_params.keepalive_retries_retries = Some(retries);
259 self
260 }
261
262 pub fn nodelay(mut self, nodelay: bool) -> Self {
266 self.http_connector_params.nodelay_nodelay = Some(nodelay);
267 self
268 }
269
270 pub fn send_buffer_size(mut self, size: Option<usize>) -> Self {
272 self.http_connector_params.send_buffer_size_size = Some(size);
273 self
274 }
275
276 pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
278 self.http_connector_params.recv_buffer_size_size = Some(size);
279 self
280 }
281
282 pub fn local_address(mut self, addr: Option<IpAddr>) -> Self {
288 self.http_connector_params.local_address_addr = Some(addr);
289 self
290 }
291
292 pub fn local_addresses(mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) -> Self {
295 self.http_connector_params.local_addresses_addrs = Some((addr_ipv4, addr_ipv6));
296 self
297 }
298
299 pub fn connect_timeout(mut self, dur: Option<Duration>) -> Self {
306 self.http_connector_params.connect_timeout_dur = Some(dur);
307 self
308 }
309
310 pub fn happy_eyeballs_timeout(mut self, dur: Option<Duration>) -> Self {
323 self.http_connector_params.happy_eyeballs_timeout_dur = Some(dur);
324 self
325 }
326
327 pub fn reuse_address(mut self, reuse_address: bool) -> Self {
331 self.http_connector_params.reuse_address_reuse_address = Some(reuse_address);
332 self
333 }
334
335 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
348 pub fn interface<S: Into<String>>(mut self, interface: S) -> Self {
349 self.http_connector_params.interface_interface = Some(interface.into());
350 self
351 }
352
353 pub async fn establish(mut self) -> Result<Io, Error> {
355 self.http_connector_params.apply_resolver(
356 &mut self.resolver,
357 );
358 let mut http_connector =
359 legacy::connect::HttpConnector::new_with_resolver(
360 self.resolver,
361 );
362 self.http_connector_params.apply(
363 &mut http_connector,
364 );
365 let stream = connection_establish_tcp(http_connector, self.uri).await?;
366 Ok(Io {
367 protocols: Protocols {
368 http1_support: true,
369 http2_support: false,
370 },
371 uri_host: self.uri_host,
372 stream: IoStream {
373 kind: IoKind::Tcp(
374 IoTcp {
375 stream,
376 },
377 ),
378 },
379 })
380 }
381
382 pub async fn tls_setup(mut self) -> Result<TlsBuilder, Error> {
384 self.http_connector_params.apply_resolver(
385 &mut self.resolver,
386 );
387 let mut http_connector =
388 legacy::connect::HttpConnector::new_with_resolver(
389 self.resolver,
390 );
391 self.http_connector_params.apply(
392 &mut http_connector,
393 );
394 http_connector
395 .enforce_http(false);
396 let stream = connection_establish_tcp(http_connector, self.uri.clone()).await?;
397 Ok(TlsBuilder::new(
398 stream,
399 self.uri,
400 self.uri_host,
401 self.uri_scheme,
402 ))
403 }
404
405 pub fn socks5_proxy_setup(mut self, proxy_addr: Uri) -> Socks5ProxyBuilder {
407 self.http_connector_params.apply_resolver(
408 &mut self.resolver,
409 );
410 let mut http_connector =
411 legacy::connect::HttpConnector::new_with_resolver(
412 self.resolver,
413 );
414 self.http_connector_params.apply(
415 &mut http_connector,
416 );
417 Socks5ProxyBuilder::new(
418 proxy_addr,
419 http_connector,
420 self.uri,
421 self.uri_host,
422 self.uri_scheme,
423 )
424 }
425}
426
427#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
428pub struct Socks5Auth {
429 pub username: String,
430 pub password: String,
431}
432
433impl From<async_socks5::Auth> for Socks5Auth {
434 fn from(auth: async_socks5::Auth) -> Self {
435 Self {
436 username: auth.username,
437 password: auth.password,
438 }
439 }
440}
441
442impl From<Socks5Auth> for async_socks5::Auth {
443 fn from(auth: Socks5Auth) -> Self {
444 async_socks5::Auth::new(auth.username, auth.password)
445 }
446}
447
448pub struct Socks5ProxyBuilder {
449 uri: Uri,
450 uri_host: String,
451 uri_scheme: http::uri::Scheme,
452 http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
453 proxy_addr: Uri,
454 proxy_auth: Option<Socks5Auth>,
455}
456
457impl Socks5ProxyBuilder {
458 fn new(
459 proxy_addr: Uri,
460 http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
461 uri: Uri,
462 uri_host: String,
463 uri_scheme: http::uri::Scheme,
464 )
465 -> Self
466 {
467 Self {
468 uri,
469 uri_host,
470 uri_scheme,
471 http_connector,
472 proxy_addr,
473 proxy_auth: None,
474 }
475 }
476
477 pub fn auth(mut self, proxy_auth: Option<Socks5Auth>) -> Self {
483 self.proxy_auth = proxy_auth;
484 self
485 }
486
487 pub async fn establish(self) -> Result<Io, Error> {
489 let stream =
490 connection_establish_proxy(
491 self.proxy_addr,
492 self.proxy_auth,
493 self.http_connector,
494 self.uri,
495 self.uri_host.clone(),
496 )
497 .await?;
498 Ok(Io {
499 protocols: Protocols {
500 http1_support: true,
501 http2_support: false,
502 },
503 uri_host: self.uri_host,
504 stream: IoStream {
505 kind: IoKind::Tcp(
506 IoTcp {
507 stream,
508 },
509 ),
510 },
511 })
512 }
513
514 pub async fn tls_setup(mut self) -> Result<TlsBuilder, Error> {
516 self.http_connector
517 .enforce_http(false);
518 let stream =
519 connection_establish_proxy(
520 self.proxy_addr,
521 self.proxy_auth,
522 self.http_connector,
523 self.uri.clone(),
524 self.uri_host.clone(),
525 )
526 .await?;
527
528 Ok(TlsBuilder::new(
529 stream,
530 self.uri,
531 self.uri_host,
532 self.uri_scheme,
533 ))
534 }
535}
536
537pub struct TlsBuilder {
538 uri: Uri,
539 uri_host: String,
540 uri_scheme: http::uri::Scheme,
541 stream: TokioIo<TcpStream>,
542}
543
544impl TlsBuilder {
545 fn new(
546 stream: TokioIo<TcpStream>,
547 uri: Uri,
548 uri_host: String,
549 uri_scheme: http::uri::Scheme,
550 )
551 -> Self
552 {
553 Self {
554 uri,
555 uri_host,
556 uri_scheme,
557 stream,
558 }
559 }
560
561 pub fn tls_config(self, config: rustls::ClientConfig) -> Result<TlsBuilderConfig, Error> {
570 if !config.alpn_protocols.is_empty() {
571 Err(Error::TlsNonEmptyAlpnProtocols)
572 } else {
573 Ok(TlsBuilderConfig::new(
574 config,
575 self.stream,
576 self.uri,
577 self.uri_host,
578 self.uri_scheme,
579 ))
580 }
581 }
582
583 pub fn native_roots(self) -> Result<TlsBuilderConfig, Error> {
585 let mut root_store = rustls::RootCertStore::empty();
586 let native_certs_iter = rustls_native_certs::load_native_certs()
587 .map_err(Error::TlsNativeCertsLoad)?;
588 for cert in native_certs_iter {
589 root_store.add(cert)
590 .map_err(Error::TlsNativeCertAdd)?;
591 }
592 Ok(TlsBuilderConfig::new(
593 rustls::ClientConfig::builder()
594 .with_root_certificates(root_store)
595 .with_no_client_auth(),
596 self.stream,
597 self.uri,
598 self.uri_host,
599 self.uri_scheme,
600 ))
601 }
602
603 pub fn webpki_roots(self) -> Result<TlsBuilderConfig, Error> {
605 let mut root_store = rustls::RootCertStore::empty();
606 root_store.extend(
607 webpki_roots::TLS_SERVER_ROOTS
608 .iter()
609 .cloned(),
610 );
611 Ok(TlsBuilderConfig::new(
612 rustls::ClientConfig::builder()
613 .with_root_certificates(root_store)
614 .with_no_client_auth(),
615 self.stream,
616 self.uri,
617 self.uri_host,
618 self.uri_scheme,
619 ))
620 }
621}
622
623pub struct TlsBuilderConfig {
624 uri: Uri,
625 uri_host: String,
626 uri_scheme: http::uri::Scheme,
627 stream: TokioIo<TcpStream>,
628 tls_config: rustls::ClientConfig,
629 https_only: bool,
630 http1_enabled: bool,
631 http2_enabled: bool,
632 override_server_name: Option<String>,
633}
634
635impl TlsBuilderConfig {
636 fn new(
637 tls_config: rustls::ClientConfig,
638 stream: TokioIo<TcpStream>,
639 uri: Uri,
640 uri_host: String,
641 uri_scheme: http::uri::Scheme,
642 )
643 -> Self
644 {
645 Self {
646 uri,
647 uri_host,
648 uri_scheme,
649 stream,
650 tls_config,
651 https_only: false,
652 http1_enabled: false,
653 http2_enabled: false,
654 override_server_name: None,
655 }
656 }
657
658 pub fn https_only(mut self) -> Self {
664 self.https_only = true;
665 self
666 }
667
668 pub fn https_or_http(mut self) -> Self {
675 self.https_only = false;
676 self
677 }
678
679 pub fn enable_http1(mut self) -> Self {
683 self.http1_enabled = true;
684 self.http2_enabled = false;
685 self
686 }
687
688 pub fn enable_http2(mut self) -> Self {
692 self.http1_enabled = false;
693 self.http2_enabled = true;
694 self
695 }
696
697 pub fn enable_all_versions(mut self) -> Self {
702 self.http1_enabled = true;
703 self.http2_enabled = true;
704 self
705 }
706
707 pub fn with_server_name(mut self, override_server_name: String) -> Self {
717 self.override_server_name = Some(override_server_name);
718 self
719 }
720
721 pub async fn establish(mut self) -> Result<Io, Error> {
723 let mut alpn_protocols = Vec::new();
724 if self.http2_enabled {
725 alpn_protocols.push(b"h2".to_vec());
726 }
727 if self.http1_enabled {
728 alpn_protocols.push(b"http/1.1".to_vec());
729 }
730 self.tls_config.alpn_protocols = alpn_protocols;
731
732 if self.uri_scheme == http::uri::Scheme::HTTP && !self.https_only {
733 return Ok(Io {
734 protocols: Protocols {
735 http1_support: true,
736 http2_support: false,
737 },
738 uri_host: self.uri_host,
739 stream: IoStream {
740 kind: IoKind::Tcp(
741 IoTcp {
742 stream: self.stream,
743 },
744 ),
745 },
746 });
747 }
748 if self.uri_scheme != http::uri::Scheme::HTTPS {
749 return Err(Error::UriUnsupportedHttpsScheme {
750 uri: self.uri,
751 scheme: self.uri_scheme,
752 });
753 }
754
755 let mut hostname =
756 match self.override_server_name.as_deref() {
757 Some(server_name) =>
758 server_name,
759 None =>
760 &self.uri_host,
761 };
762
763 hostname = hostname
764 .trim_start_matches('[')
765 .trim_end_matches(']');
766
767 log::debug!("TlsBuilderConfig::establish: getting server_name from hostname = {hostname:?}");
768 let server_name = rustls::pki_types::ServerName::try_from(hostname)
769 .map_err(|error| Error::TlsInvalidDnsName {
770 hostname: hostname.to_string(),
771 error,
772 })?
773 .to_owned();
774
775 let connector =
776 tokio_rustls::TlsConnector::from(Arc::new(self.tls_config));
777 log::debug!("TlsBuilderConfig::establish: performing connect to server_name = {server_name:?}");
778 let tls = connector
779 .connect(server_name, self.stream.into_inner())
780 .await
781 .map_err(Error::ConnectionTls)?;
782
783 #[allow(clippy::match_like_matches_macro)]
784 let h2_announced = if let Some(b"h2") = tls.get_ref().1.alpn_protocol() {
785 true
786 } else {
787 false
788 };
789
790 Ok(Io {
791 protocols: Protocols {
792 http1_support: self.http1_enabled,
793 http2_support: self.http2_enabled && h2_announced,
794 },
795 uri_host: self.uri_host,
796 stream: IoStream {
797 kind: IoKind::TcpTls(
798 IoTcpTls {
799 stream: TokioIo::new(tls),
800 },
801 ),
802 },
803 })
804 }
805}
806
807async fn connection_establish_tcp(
808 mut http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
809 uri: Uri,
810)
811 -> Result<TokioIo<TcpStream>, Error>
812{
813 log::debug!("connection_establish_tcp to uri = {uri:?}");
814 tower_service::Service::call(&mut http_connector, uri).await
815 .map_err(|error| {
816 Error::Connection(Box::new(error) as Box<dyn std::error::Error + Send + Sync + 'static>)
817 })
818}
819
820async fn connection_establish_proxy(
821 proxy_addr: Uri,
822 proxy_auth: Option<Socks5Auth>,
823 mut http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
824 uri: Uri,
825 uri_host: String,
826)
827 -> Result<TokioIo<TcpStream>, Error>
828{
829 log::debug!("connection_establish_proxy to uri = {uri:?}, uri_host = {uri_host}, proxy_addr = {proxy_addr:?}");
830 let port = match uri.port() {
831 Some(port) =>
832 port.as_u16(),
833 None =>
834 if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
835 443
836 } else {
837 80
838 },
839 };
840 log::debug!("connection_establish_proxy: using port {port:?}");
841 let target_addr =
842 async_socks5::AddrKind::Domain(uri_host, port);
843
844 log::debug!("connection_establish_proxy: target_addr = {target_addr:?}");
845 let tokio_io_stream =
846 tower_service::Service::call(
847 &mut http_connector,
848 proxy_addr,
849 )
850 .await
851 .map_err(|error| {
852 Error::ConnectionToSocks5(Box::new(error) as Box<dyn std::error::Error + Send + Sync + 'static>)
853 })?;
854 let stream = tokio_io_stream
855 .into_inner();
856
857 log::debug!("connection_establish_proxy: tokio_io_stream connected");
858 let mut buf_stream =
859 tokio::io::BufStream::new(stream);
860 let _addr_kind =
861 async_socks5::connect(
862 &mut buf_stream,
863 target_addr,
864 proxy_auth.map(Into::into),
865 )
866 .await
867 .map_err(Error::ConnectionViaSocks5)?;
868
869 log::debug!("connection_establish_proxy: async_socks5 connected");
870 Ok(TokioIo::new(buf_stream.into_inner()))
871}