#[cfg(feature = "_async")]
use std::error::Error as StdError;
#[cfg(feature = "_async")]
use std::future::Future;
#[cfg(feature = "_async")]
use std::pin::Pin;
#[cfg(feature = "_async")]
use std::task::{Context, Poll};
#[cfg(feature = "_async")]
use std::time::Duration;
use http::Uri;
use http::header::HeaderValue;
#[cfg(feature = "_async")]
use hyper::rt::{Read as HyperRead, ReadBufCursor, Write as HyperWrite};
#[cfg(feature = "_async")]
use hyper_util::client::legacy::connect::proxy::Tunnel;
#[cfg(feature = "_async")]
use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
#[cfg(feature = "_async")]
use tower_service::Service;
use url::Url;
use crate::error::Error;
use crate::util::{
default_port, is_valid_absolute_http_uri_text, normalize_host_key,
redact_uri_without_url_normalization,
};
#[cfg(feature = "_async")]
pub(crate) type BoxConnectError = Box<dyn StdError + Send + Sync>;
#[derive(Clone)]
pub(crate) struct ProxyConfig {
pub(crate) uri: Uri,
pub(crate) authorization: Option<HeaderValue>,
pub(crate) no_proxy_rules: Vec<NoProxyRule>,
}
#[derive(Clone, Debug)]
pub(crate) enum NoProxyRule {
Any,
Domain { host: String, port: Option<u16> },
}
impl NoProxyRule {
pub(crate) fn parse(text: &str) -> Option<Self> {
fn looks_like_url_rule(value: &str) -> bool {
let bytes = value.as_bytes();
value.contains("://")
|| bytes
.get(..5)
.is_some_and(|prefix| prefix.eq_ignore_ascii_case(b"http:"))
|| bytes
.get(..6)
.is_some_and(|prefix| prefix.eq_ignore_ascii_case(b"https:"))
}
let mut candidate = text.trim().to_owned();
let mut port = None;
if candidate.is_empty() {
return None;
}
if candidate == "*" {
return Some(Self::Any);
}
if let Ok(url) = Url::parse(&candidate)
&& let Some(host) = url.host_str()
{
if !matches!(url.scheme(), "http" | "https")
|| !is_valid_absolute_http_uri_text(&candidate)
{
return None;
}
if !url.username().is_empty()
|| url.password().is_some()
|| url.query().is_some()
|| url.fragment().is_some()
|| url.path() != "/"
{
return None;
}
candidate = host.to_owned();
port = url.port();
} else if looks_like_url_rule(&candidate) {
return None;
}
candidate = candidate.trim_start_matches('.').to_owned();
if candidate.is_empty() {
return None;
}
if let Some(stripped) = candidate.strip_prefix('[') {
let end = stripped.find(']')?;
let host = &stripped[..end];
let suffix = &stripped[end + 1..];
if suffix.is_empty() {
port = None;
} else if let Some(raw_port) = suffix.strip_prefix(':') {
port = Some(raw_port.parse::<u16>().ok()?);
} else {
return None;
}
candidate = host.to_owned();
} else if candidate.matches(':').count() == 1 {
let (host, raw_port) = candidate.rsplit_once(':')?;
if host.is_empty() {
return None;
}
port = Some(raw_port.parse::<u16>().ok()?);
candidate = host.to_owned();
}
Some(Self::Domain {
host: normalize_host_key(&candidate)?,
port,
})
}
pub(crate) fn matches(&self, host: &str, port: Option<u16>) -> bool {
match self {
Self::Any => true,
Self::Domain {
host: domain,
port: rule_port,
} => {
let Some(host) = normalize_host_key(host) else {
return false;
};
let host_matches = host == *domain || host.ends_with(&format!(".{domain}"));
if !host_matches {
return false;
}
match rule_port {
Some(rule_port) => port == Some(*rule_port),
None => true,
}
}
}
}
}
pub(crate) fn redact_no_proxy_rule_for_logs(rule: &str) -> String {
redact_uri_without_url_normalization(rule.trim())
}
pub(crate) fn should_bypass_proxy_uri(no_proxy_rules: &[NoProxyRule], uri: &Uri) -> bool {
let Some(host) = uri.host() else {
return false;
};
let port = uri.port_u16().or_else(|| default_port(uri));
no_proxy_rules.iter().any(|rule| rule.matches(host, port))
}
pub(crate) fn parse_no_proxy_rule(rule: &str) -> crate::Result<NoProxyRule> {
NoProxyRule::parse(rule).ok_or_else(|| Error::InvalidNoProxyRule {
rule: redact_no_proxy_rule_for_logs(rule),
})
}
pub(crate) fn parse_no_proxy_rules<I, S>(rules: I) -> crate::Result<Vec<NoProxyRule>>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
rules
.into_iter()
.map(|rule| parse_no_proxy_rule(rule.as_ref()))
.collect()
}
#[derive(Clone)]
#[cfg(feature = "_async")]
struct ProxyRuntime {
tunnel: Tunnel<HttpConnector>,
proxy_uri: Uri,
no_proxy_rules: Vec<NoProxyRule>,
}
#[cfg(feature = "_async")]
impl ProxyRuntime {
fn should_bypass_proxy(&self, uri: &Uri) -> bool {
should_bypass_proxy_uri(&self.no_proxy_rules, uri)
}
}
#[cfg(feature = "_async")]
#[derive(Debug)]
pub(crate) struct ProxyConnection<T> {
inner: T,
proxied: bool,
}
#[cfg(feature = "_async")]
impl<T> ProxyConnection<T> {
fn new(inner: T, proxied: bool) -> Self {
Self { inner, proxied }
}
}
#[cfg(feature = "_async")]
impl<T> HyperRead for ProxyConnection<T>
where
T: HyperRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
let inner = &mut self.get_mut().inner;
Pin::new(inner).poll_read(cx, buf)
}
}
#[cfg(feature = "_async")]
impl<T> HyperWrite for ProxyConnection<T>
where
T: HyperWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let inner = &mut self.get_mut().inner;
Pin::new(inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
let inner = &mut self.get_mut().inner;
Pin::new(inner).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let inner = &mut self.get_mut().inner;
Pin::new(inner).poll_shutdown(cx)
}
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
let inner = &mut self.get_mut().inner;
Pin::new(inner).poll_write_vectored(cx, bufs)
}
}
#[cfg(feature = "_async")]
impl<T> Connection for ProxyConnection<T>
where
T: Connection,
{
fn connected(&self) -> Connected {
self.inner.connected().proxy(self.proxied)
}
}
#[derive(Clone)]
#[cfg(feature = "_async")]
pub(crate) struct ProxyConnector {
direct: HttpConnector,
proxy: Option<ProxyRuntime>,
}
#[cfg(feature = "_async")]
impl ProxyConnector {
pub(crate) fn new(proxy_config: Option<ProxyConfig>, connect_timeout: Duration) -> Self {
let mut direct = HttpConnector::new();
direct.enforce_http(false);
direct.set_connect_timeout(Some(connect_timeout));
let proxy = proxy_config.map(|config| {
let mut tunnel = Tunnel::new(config.uri.clone(), direct.clone());
if let Some(authorization) = config.authorization {
tunnel = tunnel.with_auth(authorization);
}
ProxyRuntime {
tunnel,
proxy_uri: config.uri,
no_proxy_rules: config.no_proxy_rules,
}
});
Self { direct, proxy }
}
}
#[cfg(feature = "_async")]
impl Service<Uri> for ProxyConnector {
type Response = ProxyConnection<<HttpConnector as Service<Uri>>::Response>;
type Error = BoxConnectError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(proxy) = &mut self.proxy {
let direct_ready = match self.direct.poll_ready(cx) {
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(error)) => return Poll::Ready(Err(Box::new(error))),
Poll::Pending => false,
};
let tunnel_ready = match proxy.tunnel.poll_ready(cx) {
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(error)) => return Poll::Ready(Err(Box::new(error))),
Poll::Pending => false,
};
return if direct_ready && tunnel_ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
};
}
match self.direct.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(error)) => Poll::Ready(Err(Box::new(error))),
Poll::Pending => Poll::Pending,
}
}
fn call(&mut self, dst: Uri) -> Self::Future {
if let Some(proxy) = &mut self.proxy {
if proxy.should_bypass_proxy(&dst) {
let connecting = self.direct.call(dst);
return Box::pin(async move {
connecting
.await
.map(|connection| ProxyConnection::new(connection, false))
.map_err(|error| Box::new(error) as _)
});
}
let scheme = dst.scheme_str().unwrap_or_default();
if scheme.eq_ignore_ascii_case("https") {
let tunnel_target = normalize_tunnel_target_uri(dst);
let connecting = proxy.tunnel.call(tunnel_target);
return Box::pin(async move {
connecting
.await
.map(|connection| ProxyConnection::new(connection, false))
.map_err(|error| Box::new(error) as _)
});
}
let connecting = self.direct.call(proxy.proxy_uri.clone());
return Box::pin(async move {
connecting
.await
.map(|connection| ProxyConnection::new(connection, true))
.map_err(|error| Box::new(error) as _)
});
}
let connecting = self.direct.call(dst);
Box::pin(async move {
connecting
.await
.map(|connection| ProxyConnection::new(connection, false))
.map_err(|error| Box::new(error) as _)
})
}
}
#[cfg(feature = "_async")]
pub(crate) fn normalize_tunnel_target_uri(dst: Uri) -> Uri {
if dst.port().is_some() {
return dst;
}
let Some(scheme) = dst.scheme_str() else {
return dst;
};
let default_port = if scheme.eq_ignore_ascii_case("https") {
443
} else if scheme.eq_ignore_ascii_case("http") {
80
} else {
return dst;
};
let Some(host) = dst.host() else {
return dst;
};
let authority_text = if host.contains(':') && !host.starts_with('[') {
format!("[{host}]:{default_port}")
} else {
format!("{host}:{default_port}")
};
let Ok(authority) = authority_text.parse() else {
return dst;
};
let original = dst.clone();
let mut parts = dst.into_parts();
parts.authority = Some(authority);
Uri::from_parts(parts).unwrap_or(original)
}