#[cfg(unix)]
use std::path::Path;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use http::Uri;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
Service, ServiceBuilder, ServiceExt,
timeout::TimeoutLayer,
util::{BoxCloneSyncService, MapRequestLayer},
};
#[cfg(unix)]
use super::uds::UnixConnector;
use super::{
AsyncConnWithInfo, BoxedConnectorLayer, BoxedConnectorService, Connection, HttpConnector,
TlsInfoFactory, Unnameable,
conn::{Conn, TlsConn},
proxy,
verbose::Verbose,
};
use crate::{
client::http::{ConnectExtra, ConnectRequest},
dns::DynResolver,
error::{BoxError, ProxyConnect, TimedOut, map_timeout_to_connector_error},
ext::UriExt,
proxy::{Intercepted, Matcher as ProxyMatcher, matcher::Intercept},
tls::{
TlsOptions,
conn::{
EstablishedConn, HttpsConnector, MaybeHttpsStream, TlsConnector, TlsConnectorBuilder,
TlsStream,
},
},
};
type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
#[derive(Clone)]
struct Config {
proxies: Arc<Vec<ProxyMatcher>>,
verbose: Verbose,
tcp_nodelay: bool,
tls_info: bool,
timeout: Option<Duration>,
}
pub struct ConnectorBuilder {
config: Config,
#[cfg(feature = "socks")]
resolver: DynResolver,
http: HttpConnector,
tls_options: TlsOptions,
tls_builder: TlsConnectorBuilder,
}
#[derive(Clone)]
pub enum Connector {
Simple(ConnectorService),
WithLayers(BoxedConnectorService),
}
#[derive(Clone)]
pub struct ConnectorService {
config: Config,
#[cfg(feature = "socks")]
resolver: DynResolver,
http: HttpConnector,
tls: TlsConnector,
tls_builder: Arc<TlsConnectorBuilder>,
}
impl ConnectorBuilder {
#[inline]
pub fn with_http<F>(mut self, call: F) -> ConnectorBuilder
where
F: FnOnce(&mut HttpConnector),
{
call(&mut self.http);
self
}
#[inline]
pub fn with_tls<F>(mut self, call: F) -> ConnectorBuilder
where
F: FnOnce(TlsConnectorBuilder) -> TlsConnectorBuilder,
{
self.tls_builder = call(self.tls_builder);
self
}
#[inline]
pub fn timeout(mut self, timeout: Option<Duration>) -> ConnectorBuilder {
self.config.timeout = timeout;
self
}
#[inline]
pub fn verbose(mut self, enabled: bool) -> ConnectorBuilder {
self.config.verbose.0 = enabled;
self
}
#[inline]
pub fn tls_info(mut self, enabled: bool) -> ConnectorBuilder {
self.config.tls_info = enabled;
self
}
#[inline]
pub fn tls_options(mut self, opts: Option<TlsOptions>) -> ConnectorBuilder {
if let Some(opts) = opts {
self.tls_options = opts;
}
self
}
pub fn build(self, layers: Vec<BoxedConnectorLayer>) -> crate::Result<Connector> {
let mut service = ConnectorService {
config: self.config,
#[cfg(feature = "socks")]
resolver: self.resolver.clone(),
http: self.http,
tls: self.tls_builder.build(&self.tls_options)?,
tls_builder: Arc::new(self.tls_builder),
};
if layers.is_empty() {
return Ok(Connector::Simple(service));
}
let timeout = service.config.timeout.take();
let service = layers.into_iter().fold(
BoxCloneSyncService::new(
ServiceBuilder::new()
.layer(MapRequestLayer::new(|request: Unnameable| request.0))
.service(service),
),
|service, layer| ServiceBuilder::new().layer(layer).service(service),
);
match timeout {
Some(timeout) => {
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(timeout))
.service(service)
.map_err(map_timeout_to_connector_error);
Ok(Connector::WithLayers(BoxCloneSyncService::new(service)))
}
None => {
let service = ServiceBuilder::new()
.service(service)
.map_err(map_timeout_to_connector_error);
Ok(Connector::WithLayers(BoxCloneSyncService::new(service)))
}
}
}
}
impl Connector {
pub(crate) fn builder(proxies: Vec<ProxyMatcher>, resolver: DynResolver) -> ConnectorBuilder {
ConnectorBuilder {
config: Config {
proxies: Arc::new(proxies),
verbose: Verbose::OFF,
tcp_nodelay: false,
tls_info: false,
timeout: None,
},
#[cfg(feature = "socks")]
resolver: resolver.clone(),
http: HttpConnector::new_with_resolver(resolver),
tls_options: TlsOptions::default(),
tls_builder: TlsConnector::builder(),
}
}
}
impl Service<ConnectRequest> for Connector {
type Response = Conn;
type Error = BoxError;
type Future = Connecting;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
Connector::Simple(service) => service.poll_ready(cx),
Connector::WithLayers(service) => service.poll_ready(cx),
}
}
#[inline]
fn call(&mut self, req: ConnectRequest) -> Self::Future {
match self {
Connector::Simple(service) => service.call(req),
Connector::WithLayers(service) => service.call(Unnameable(req)),
}
}
}
impl ConnectorService {
fn tunnel_conn_from_stream<IO>(&self, io: MaybeHttpsStream<IO>) -> Result<Conn, BoxError>
where
IO: AsyncConnWithInfo,
TlsConn<IO>: Connection,
TlsStream<IO>: TlsInfoFactory,
{
let conn = match io {
MaybeHttpsStream::Http(inner) => Conn {
inner: self.config.verbose.wrap(inner),
tls_info: false,
proxy: None,
},
MaybeHttpsStream::Https(inner) => Conn {
inner: self.config.verbose.wrap(TlsConn::new(inner)),
tls_info: self.config.tls_info,
proxy: None,
},
};
Ok(conn)
}
fn conn_from_stream<IO, P>(&self, io: MaybeHttpsStream<IO>, proxy: P) -> Result<Conn, BoxError>
where
IO: AsyncConnWithInfo,
TlsConn<IO>: Connection,
TlsStream<IO>: TlsInfoFactory,
P: Into<Option<Intercept>>,
{
let conn = match io {
MaybeHttpsStream::Http(inner) => self.config.verbose.wrap(inner),
MaybeHttpsStream::Https(inner) => self.config.verbose.wrap(TlsConn::new(inner)),
};
Ok(Conn {
inner: conn,
tls_info: self.config.tls_info,
proxy: proxy.into(),
})
}
fn build_https_connector(
&self,
extra: &ConnectExtra,
) -> Result<HttpsConnector<HttpConnector>, BoxError> {
let mut http = self.http.clone();
if !self.config.tcp_nodelay {
http.set_nodelay(true);
}
if let Some(opts) = extra.tcp_options() {
http.set_connect_options(opts.clone());
}
self.build_tls_connector_generic(http, extra)
}
fn build_https_proxy_connector(
&self,
extra: &ConnectExtra,
) -> Result<HttpsConnector<HttpConnector>, BoxError> {
let mut http = self.http.clone();
if !self.config.tcp_nodelay {
http.set_nodelay(true);
}
if let Some(opts) = extra.tcp_options() {
http.set_connect_options(opts.clone());
}
let mut tls_options = extra.tls_options().cloned().unwrap_or_default();
tls_options.alpn_protocols = None;
let tls = self
.tls_builder
.as_ref()
.clone()
.identity(None)
.alpn_protocol(None)
.build(&tls_options)?;
Ok(HttpsConnector::with_connector(http, tls))
}
#[cfg(unix)]
fn build_unix_connector(
&self,
unix_socket: Arc<Path>,
extra: &ConnectExtra,
) -> Result<HttpsConnector<UnixConnector>, BoxError> {
self.build_tls_connector_generic(UnixConnector(unix_socket), extra)
}
fn build_tls_connector_generic<S, T>(
&self,
connector: S,
extra: &ConnectExtra,
) -> Result<HttpsConnector<S>, BoxError>
where
S: Service<Uri, Response = T> + Send,
S::Error: Into<BoxError>,
S::Future: Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Connection + Unpin + std::fmt::Debug + Sync + Send + 'static,
{
let tls = extra
.tls_options()
.map(|opts| self.tls_builder.build(opts))
.transpose()?
.unwrap_or_else(|| self.tls.clone());
Ok(HttpsConnector::with_connector(connector, tls))
}
}
impl ConnectorService {
async fn connect_auto_proxy<P>(self, req: ConnectRequest, proxy: P) -> Result<Conn, BoxError>
where
P: Into<Option<Intercept>>,
{
let proxy = proxy.into();
trace!("connect with maybe proxy: {:?}", proxy);
let mut connector = self.build_https_connector(req.extra())?;
if proxy.is_some() && req.uri().is_https() {
connector.no_alpn();
}
let io = connector.call(req).await?;
if !self.config.tcp_nodelay {
io.get_ref().set_nodelay(false)?;
}
self.conn_from_stream(io, proxy)
}
async fn connect_via_proxy(
self,
mut req: ConnectRequest,
proxy: Intercepted,
) -> Result<Conn, BoxError> {
let uri = req.uri().clone();
match proxy {
Intercepted::Proxy(proxy) => {
let proxy_uri = proxy.uri().clone();
#[cfg(feature = "socks")]
{
use proxy::socks::{DnsResolve, SocksConnector, Version};
if let Some((version, dns_resolve)) = match proxy.uri().scheme_str() {
Some("socks4") => Some((Version::V4, DnsResolve::Local)),
Some("socks4a") => Some((Version::V4, DnsResolve::Remote)),
Some("socks5") => Some((Version::V5, DnsResolve::Local)),
Some("socks5h") => Some((Version::V5, DnsResolve::Remote)),
_ => None,
} {
trace!("connecting via SOCKS proxy: {:?}", proxy_uri);
let conn = {
let mut socks = SocksConnector::new_with_resolver(
proxy_uri,
self.http.clone(),
self.resolver.clone(),
);
socks.set_auth(proxy.raw_auth());
socks.set_version(version);
socks.set_dns_mode(dns_resolve);
socks.call(uri).await?
};
let mut connector = self.build_https_connector(req.extra())?;
let io = connector.call(EstablishedConn::new(conn, req)).await?;
if !self.config.tcp_nodelay {
io.get_ref().set_nodelay(false)?;
}
return self.tunnel_conn_from_stream(io);
}
}
if uri.is_https() {
trace!("tunneling over HTTP(s) proxy: {:?}", proxy_uri);
let proxy_connector = self.build_https_proxy_connector(req.extra())?;
let mut connector = self.build_https_connector(req.extra())?;
let tunneled = {
let mut tunnel =
proxy::tunnel::TunnelConnector::new(proxy_uri, proxy_connector);
if let Some(auth) = proxy.basic_auth() {
tunnel = tunnel.with_auth(auth.clone());
}
if let Some(headers) = proxy.custom_headers() {
tunnel = tunnel.with_headers(headers.clone());
}
tunnel.call(uri).await?
};
let io = connector.call(EstablishedConn::new(tunneled, req)).await?;
if !self.config.tcp_nodelay {
io.get_ref().get_ref().set_nodelay(false)?;
}
return self.tunnel_conn_from_stream(io);
}
*req.uri_mut() = proxy_uri;
self.connect_auto_proxy(req, proxy)
.await
.map_err(ProxyConnect)
.map_err(Into::into)
}
#[cfg(unix)]
Intercepted::Unix(unix_socket) => {
trace!("connecting via Unix socket: {:?}", unix_socket);
let mut connector = self.build_unix_connector(unix_socket, req.extra())?;
if uri.is_https() {
let proxy_uri = Uri::from_static("http://localhost");
let tunneled = {
let mut tunnel =
proxy::tunnel::TunnelConnector::new(proxy_uri, connector.clone());
tunnel.call(uri).await?
};
let io = connector.call(EstablishedConn::new(tunneled, req)).await?;
return self.tunnel_conn_from_stream(io);
}
let io = connector.call(req).await?;
self.conn_from_stream(io, None)
}
}
}
async fn connect_auto(self, req: ConnectRequest) -> Result<Conn, BoxError> {
debug!("starting new connection: {:?}", req.uri());
let timeout = self.config.timeout;
let fut = async {
let intercepted = req
.extra()
.proxy_matcher()
.and_then(|prox| prox.intercept(req.uri()))
.or_else(|| {
self.config
.proxies
.iter()
.find_map(|prox| prox.intercept(req.uri()))
});
if let Some(intercepted) = intercepted {
self.connect_via_proxy(req, intercepted).await
} else {
self.connect_auto_proxy(req, None).await
}
};
if let Some(to) = timeout {
tokio::time::timeout(to, fut).await.map_err(|_| TimedOut)?
} else {
fut.await
}
}
}
impl Service<ConnectRequest> for ConnectorService {
type Response = Conn;
type Error = BoxError;
type Future = Connecting;
#[inline]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn call(&mut self, req: ConnectRequest) -> Self::Future {
Box::pin(self.clone().connect_auto(req))
}
}