use std::{
sync::{
Arc,
},
time::{
Duration,
},
net::{
IpAddr,
Ipv4Addr,
Ipv6Addr,
},
};
use http::{
uri::{
Uri,
},
};
use hyper_util::{
client::{
legacy,
},
};
use tokio::{
net::{
TcpStream,
},
};
use crate::{
resolver,
Io,
Protocols,
IoStream,
IoKind,
IoTcp,
IoTcpTls,
TokioIo,
};
#[derive(Debug)]
pub enum Error {
UriMissingScheme {
uri: Uri,
},
UriMissingHost {
uri: Uri,
},
UriUnsupportedHttpsScheme {
uri: Uri,
scheme: http::uri::Scheme,
},
ResolverBuild(hickory_resolver::error::ResolveError),
TlsNonEmptyAlpnProtocols,
TlsNativeCertsLoad(std::io::Error),
TlsNativeCertAdd(rustls::Error),
TlsInvalidDnsName {
hostname: String,
error: rustls::pki_types::InvalidDnsNameError,
},
Connection(Box<dyn std::error::Error + Send + Sync + 'static>),
ConnectionToSocks5(Box<dyn std::error::Error + Send + Sync + 'static>),
ConnectionViaSocks5(async_socks5::Error),
ConnectionTls(std::io::Error),
}
pub struct ResolverBuilder {
resolver_kind: resolver::ResolverKind,
}
impl ResolverBuilder {
pub(super) fn new() -> Self {
Self {
resolver_kind: resolver::ResolverKind::System,
}
}
pub fn system(mut self) -> Self {
self.resolver_kind = resolver::ResolverKind::System;
self
}
pub fn google(mut self) -> Self {
self.resolver_kind = resolver::ResolverKind::Google;
self
}
pub fn google_tls(mut self) -> Self {
self.resolver_kind = resolver::ResolverKind::GoogleTls;
self
}
pub fn google_https(mut self) -> Self {
self.resolver_kind = resolver::ResolverKind::GoogleHttps;
self
}
pub fn connection_setup(self, uri: Uri) -> Result<ConnectionBuilder, Error> {
let uri_scheme = uri.scheme()
.ok_or_else(|| Error::UriMissingScheme { uri: uri.clone(), })?
.clone();
let uri_host = uri.host()
.ok_or_else(|| Error::UriMissingHost { uri: uri.clone(), })?
.to_string();
let resolver = resolver::HickoryResolver::new(self.resolver_kind)
.map_err(Error::ResolverBuild)?;
Ok(ConnectionBuilder::new(resolver, uri, uri_host, uri_scheme))
}
}
pub struct ConnectionBuilder {
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
resolver: resolver::HickoryResolver,
http_connector_params: HttpConnectorParams,
}
#[derive(Default)]
struct HttpConnectorParams {
keepalive_time: Option<Option<Duration>>,
keepalive_interval_interval: Option<Option<Duration>>,
keepalive_retries_retries: Option<Option<u32>>,
nodelay_nodelay: Option<bool>,
send_buffer_size_size: Option<Option<usize>>,
recv_buffer_size_size: Option<Option<usize>>,
local_address_addr: Option<Option<IpAddr>>,
local_addresses_addrs: Option<(Ipv4Addr, Ipv6Addr)>,
connect_timeout_dur: Option<Option<Duration>>,
happy_eyeballs_timeout_dur: Option<Option<Duration>>,
reuse_address_reuse_address: Option<bool>,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
interface_interface: Option<String>,
}
impl HttpConnectorParams {
fn apply_resolver(
&self,
resolver: &mut resolver::HickoryResolver,
)
{
if let Some(maybe_ip) = &self.local_address_addr {
if let Some(ip) = maybe_ip {
if ip.is_ipv4() {
resolver.force_ip4();
} else if ip.is_ipv6() {
resolver.force_ip6();
}
} else {
resolver.force_none();
}
}
}
fn apply(
self,
http_connector: &mut legacy::connect::HttpConnector<resolver::HickoryResolver>,
)
{
if let Some(value) = self.keepalive_time {
http_connector.set_keepalive(value);
}
if let Some(value) = self.keepalive_interval_interval {
http_connector.set_keepalive_interval(value);
}
if let Some(value) = self.keepalive_retries_retries {
http_connector.set_keepalive_retries(value);
}
if let Some(value) = self.nodelay_nodelay {
http_connector.set_nodelay(value);
}
if let Some(value) = self.send_buffer_size_size {
http_connector.set_send_buffer_size(value);
}
if let Some(value) = self.recv_buffer_size_size {
http_connector.set_recv_buffer_size(value);
}
if let Some(maybe_ip) = self.local_address_addr {
http_connector.set_local_address(maybe_ip);
}
if let Some((ip4, ip6)) = self.local_addresses_addrs {
http_connector.set_local_addresses(ip4, ip6);
}
if let Some(value) = self.connect_timeout_dur {
http_connector.set_connect_timeout(value);
}
if let Some(value) = self.happy_eyeballs_timeout_dur {
http_connector.set_happy_eyeballs_timeout(value);
}
if let Some(value) = self.reuse_address_reuse_address {
http_connector.set_reuse_address(value);
}
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
if let Some(value) = self.interface_interface {
http_connector.set_interface(value);
}
}
}
impl ConnectionBuilder {
fn new(
resolver: resolver::HickoryResolver,
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
)
-> Self
{
Self {
uri,
uri_host,
uri_scheme,
resolver,
http_connector_params: HttpConnectorParams::default(),
}
}
pub fn keepalive(mut self, time: Option<Duration>) -> Self {
self.http_connector_params.keepalive_time = Some(time);
self
}
pub fn keepalive_interval(mut self, interval: Option<Duration>) -> Self {
self.http_connector_params.keepalive_interval_interval = Some(interval);
self
}
pub fn keepalive_retries(mut self, retries: Option<u32>) -> Self {
self.http_connector_params.keepalive_retries_retries = Some(retries);
self
}
pub fn nodelay(mut self, nodelay: bool) -> Self {
self.http_connector_params.nodelay_nodelay = Some(nodelay);
self
}
pub fn send_buffer_size(mut self, size: Option<usize>) -> Self {
self.http_connector_params.send_buffer_size_size = Some(size);
self
}
pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
self.http_connector_params.recv_buffer_size_size = Some(size);
self
}
pub fn local_address(mut self, addr: Option<IpAddr>) -> Self {
self.http_connector_params.local_address_addr = Some(addr);
self
}
pub fn local_addresses(mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) -> Self {
self.http_connector_params.local_addresses_addrs = Some((addr_ipv4, addr_ipv6));
self
}
pub fn connect_timeout(mut self, dur: Option<Duration>) -> Self {
self.http_connector_params.connect_timeout_dur = Some(dur);
self
}
pub fn happy_eyeballs_timeout(mut self, dur: Option<Duration>) -> Self {
self.http_connector_params.happy_eyeballs_timeout_dur = Some(dur);
self
}
pub fn reuse_address(mut self, reuse_address: bool) -> Self {
self.http_connector_params.reuse_address_reuse_address = Some(reuse_address);
self
}
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
pub fn interface<S: Into<String>>(mut self, interface: S) -> Self {
self.http_connector_params.interface_interface = Some(interface.into());
self
}
pub async fn establish(mut self) -> Result<Io, Error> {
self.http_connector_params.apply_resolver(
&mut self.resolver,
);
let mut http_connector =
legacy::connect::HttpConnector::new_with_resolver(
self.resolver,
);
self.http_connector_params.apply(
&mut http_connector,
);
let stream = connection_establish_tcp(http_connector, self.uri).await?;
Ok(Io {
protocols: Protocols {
http1_support: true,
http2_support: false,
},
uri_host: self.uri_host,
stream: IoStream {
kind: IoKind::Tcp(
IoTcp {
stream,
},
),
},
})
}
pub async fn tls_setup(mut self) -> Result<TlsBuilder, Error> {
self.http_connector_params.apply_resolver(
&mut self.resolver,
);
let mut http_connector =
legacy::connect::HttpConnector::new_with_resolver(
self.resolver,
);
self.http_connector_params.apply(
&mut http_connector,
);
http_connector
.enforce_http(false);
let stream = connection_establish_tcp(http_connector, self.uri.clone()).await?;
Ok(TlsBuilder::new(
stream,
self.uri,
self.uri_host,
self.uri_scheme,
))
}
pub fn socks5_proxy_setup(mut self, proxy_addr: Uri) -> Socks5ProxyBuilder {
self.http_connector_params.apply_resolver(
&mut self.resolver,
);
let mut http_connector =
legacy::connect::HttpConnector::new_with_resolver(
self.resolver,
);
self.http_connector_params.apply(
&mut http_connector,
);
Socks5ProxyBuilder::new(
proxy_addr,
http_connector,
self.uri,
self.uri_host,
self.uri_scheme,
)
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct Socks5Auth {
pub username: String,
pub password: String,
}
impl From<async_socks5::Auth> for Socks5Auth {
fn from(auth: async_socks5::Auth) -> Self {
Self {
username: auth.username,
password: auth.password,
}
}
}
impl From<Socks5Auth> for async_socks5::Auth {
fn from(auth: Socks5Auth) -> Self {
async_socks5::Auth::new(auth.username, auth.password)
}
}
pub struct Socks5ProxyBuilder {
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
proxy_addr: Uri,
proxy_auth: Option<Socks5Auth>,
}
impl Socks5ProxyBuilder {
fn new(
proxy_addr: Uri,
http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
)
-> Self
{
Self {
uri,
uri_host,
uri_scheme,
http_connector,
proxy_addr,
proxy_auth: None,
}
}
pub fn auth(mut self, proxy_auth: Option<Socks5Auth>) -> Self {
self.proxy_auth = proxy_auth;
self
}
pub async fn establish(self) -> Result<Io, Error> {
let stream =
connection_establish_proxy(
self.proxy_addr,
self.proxy_auth,
self.http_connector,
self.uri,
self.uri_host.clone(),
)
.await?;
Ok(Io {
protocols: Protocols {
http1_support: true,
http2_support: false,
},
uri_host: self.uri_host,
stream: IoStream {
kind: IoKind::Tcp(
IoTcp {
stream,
},
),
},
})
}
pub async fn tls_setup(mut self) -> Result<TlsBuilder, Error> {
self.http_connector
.enforce_http(false);
let stream =
connection_establish_proxy(
self.proxy_addr,
self.proxy_auth,
self.http_connector,
self.uri.clone(),
self.uri_host.clone(),
)
.await?;
Ok(TlsBuilder::new(
stream,
self.uri,
self.uri_host,
self.uri_scheme,
))
}
}
pub struct TlsBuilder {
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
stream: TokioIo<TcpStream>,
}
impl TlsBuilder {
fn new(
stream: TokioIo<TcpStream>,
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
)
-> Self
{
Self {
uri,
uri_host,
uri_scheme,
stream,
}
}
pub fn tls_config(self, config: rustls::ClientConfig) -> Result<TlsBuilderConfig, Error> {
if !config.alpn_protocols.is_empty() {
Err(Error::TlsNonEmptyAlpnProtocols)
} else {
Ok(TlsBuilderConfig::new(
config,
self.stream,
self.uri,
self.uri_host,
self.uri_scheme,
))
}
}
pub fn native_roots(self) -> Result<TlsBuilderConfig, Error> {
let mut root_store = rustls::RootCertStore::empty();
let native_certs_iter = rustls_native_certs::load_native_certs()
.map_err(Error::TlsNativeCertsLoad)?;
for cert in native_certs_iter {
root_store.add(cert)
.map_err(Error::TlsNativeCertAdd)?;
}
Ok(TlsBuilderConfig::new(
rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth(),
self.stream,
self.uri,
self.uri_host,
self.uri_scheme,
))
}
pub fn webpki_roots(self) -> Result<TlsBuilderConfig, Error> {
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(
webpki_roots::TLS_SERVER_ROOTS
.iter()
.cloned(),
);
Ok(TlsBuilderConfig::new(
rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth(),
self.stream,
self.uri,
self.uri_host,
self.uri_scheme,
))
}
}
pub struct TlsBuilderConfig {
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
stream: TokioIo<TcpStream>,
tls_config: rustls::ClientConfig,
https_only: bool,
http1_enabled: bool,
http2_enabled: bool,
override_server_name: Option<String>,
}
impl TlsBuilderConfig {
fn new(
tls_config: rustls::ClientConfig,
stream: TokioIo<TcpStream>,
uri: Uri,
uri_host: String,
uri_scheme: http::uri::Scheme,
)
-> Self
{
Self {
uri,
uri_host,
uri_scheme,
stream,
tls_config,
https_only: false,
http1_enabled: false,
http2_enabled: false,
override_server_name: None,
}
}
pub fn https_only(mut self) -> Self {
self.https_only = true;
self
}
pub fn https_or_http(mut self) -> Self {
self.https_only = false;
self
}
pub fn enable_http1(mut self) -> Self {
self.http1_enabled = true;
self.http2_enabled = false;
self
}
pub fn enable_http2(mut self) -> Self {
self.http1_enabled = false;
self.http2_enabled = true;
self
}
pub fn enable_all_versions(mut self) -> Self {
self.http1_enabled = true;
self.http2_enabled = true;
self
}
pub fn with_server_name(mut self, override_server_name: String) -> Self {
self.override_server_name = Some(override_server_name);
self
}
pub async fn establish(mut self) -> Result<Io, Error> {
let mut alpn_protocols = Vec::new();
if self.http2_enabled {
alpn_protocols.push(b"h2".to_vec());
}
if self.http1_enabled {
alpn_protocols.push(b"http/1.1".to_vec());
}
self.tls_config.alpn_protocols = alpn_protocols;
if self.uri_scheme == http::uri::Scheme::HTTP && !self.https_only {
return Ok(Io {
protocols: Protocols {
http1_support: true,
http2_support: false,
},
uri_host: self.uri_host,
stream: IoStream {
kind: IoKind::Tcp(
IoTcp {
stream: self.stream,
},
),
},
});
}
if self.uri_scheme != http::uri::Scheme::HTTPS {
return Err(Error::UriUnsupportedHttpsScheme {
uri: self.uri,
scheme: self.uri_scheme,
});
}
let mut hostname =
match self.override_server_name.as_deref() {
Some(server_name) =>
server_name,
None =>
&self.uri_host,
};
hostname = hostname
.trim_start_matches('[')
.trim_end_matches(']');
let server_name = rustls::pki_types::ServerName::try_from(hostname)
.map_err(|error| Error::TlsInvalidDnsName {
hostname: hostname.to_string(),
error,
})?
.to_owned();
let connector =
tokio_rustls::TlsConnector::from(Arc::new(self.tls_config));
let tls = connector
.connect(server_name, self.stream.into_inner())
.await
.map_err(Error::ConnectionTls)?;
#[allow(clippy::match_like_matches_macro)]
let h2_announced = if let Some(b"h2") = tls.get_ref().1.alpn_protocol() {
true
} else {
false
};
Ok(Io {
protocols: Protocols {
http1_support: self.http1_enabled,
http2_support: self.http2_enabled && h2_announced,
},
uri_host: self.uri_host,
stream: IoStream {
kind: IoKind::TcpTls(
IoTcpTls {
stream: TokioIo::new(tls),
},
),
},
})
}
}
async fn connection_establish_tcp(
mut http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
uri: Uri,
)
-> Result<TokioIo<TcpStream>, Error>
{
tower_service::Service::call(&mut http_connector, uri).await
.map_err(|error| {
Error::Connection(Box::new(error) as Box<dyn std::error::Error + Send + Sync + 'static>)
})
}
async fn connection_establish_proxy(
proxy_addr: Uri,
proxy_auth: Option<Socks5Auth>,
mut http_connector: legacy::connect::HttpConnector<resolver::HickoryResolver>,
uri: Uri,
uri_host: String,
)
-> Result<TokioIo<TcpStream>, Error>
{
let port = match uri.port() {
Some(port) =>
port.as_u16(),
None =>
if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
443
} else {
80
},
};
let target_addr =
async_socks5::AddrKind::Domain(uri_host, port);
let tokio_io_stream =
tower_service::Service::call(
&mut http_connector,
proxy_addr,
)
.await
.map_err(|error| {
Error::ConnectionToSocks5(Box::new(error) as Box<dyn std::error::Error + Send + Sync + 'static>)
})?;
let stream = tokio_io_stream
.into_inner();
let mut buf_stream =
tokio::io::BufStream::new(stream);
let _addr_kind =
async_socks5::connect(
&mut buf_stream,
target_addr,
proxy_auth.map(Into::into),
)
.await
.map_err(Error::ConnectionViaSocks5)?;
Ok(TokioIo::new(buf_stream.into_inner()))
}