use std::fmt;
use bytes::Bytes;
use http::{
uri::{Authority, PathAndQuery, Scheme, Uri},
version::Version,
};
use motore::{layer::Layer, service::Service};
use volo::{client::Apply, context::Context};
use crate::{
client::{Target, target::RemoteHost, utils::is_default_port},
context::ClientContext,
error::{ClientError, client::request_error},
request::Request,
};
pub struct HttpProxy {
target: Option<Target>,
}
impl<S> Layer<S> for HttpProxy {
type Service = HttpProxyService<S>;
fn layer(self, inner: S) -> Self::Service {
HttpProxyService {
inner,
target: self.target,
}
}
}
#[derive(Debug)]
pub struct GatewayError {
inner: ClientError,
gateway: Target,
}
impl fmt::Display for GatewayError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"error while using proxy {}: {}",
self.gateway, self.inner
)
}
}
impl std::error::Error for GatewayError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.inner)
}
}
fn parse_uri(uri: Uri) -> Option<Target> {
if let Some(scheme) = uri.scheme() {
if scheme != &Scheme::HTTP {
tracing::warn!(
"[Volo-HTTP] HttpProxy: `{uri}` is not a valid proxy url, only HTTP protocol is \
supported"
);
return None;
}
}
let target = match Target::from_uri(&uri) {
Ok(target) => target,
Err(err) => {
tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse uri `{uri}`: {err}");
return None;
}
};
tracing::info!("[Volo-HTTP] HttpProxy: `{uri}` is used as http proxy");
Some(target)
}
fn parse_env() -> Option<Uri> {
let env = if let Ok(uri) = std::env::var("http_proxy") {
uri
} else if let Ok(uri) = std::env::var("HTTP_PROXY") {
uri
} else {
return None;
};
Uri::from_maybe_shared(env).ok()
}
impl HttpProxy {
pub fn env() -> Self {
let target = parse_env().and_then(parse_uri);
Self { target }
}
pub fn new<U>(uri: U) -> Self
where
U: TryInto<Uri>,
U::Error: std::error::Error,
{
let proxy_uri = match uri.try_into() {
Ok(uri) => Some(uri),
Err(e) => {
tracing::warn!("[Volo-HTTP] HttpProxy: failed to build http proxy: {e}");
None
}
};
let target = proxy_uri.and_then(parse_uri);
Self { target }
}
}
pub struct HttpProxyService<S> {
inner: S,
target: Option<Target>,
}
impl<S> HttpProxyService<S> {
fn update_req<B>(&self, cx: &mut ClientContext, req: &mut Request<B>) {
let Some(target) = &self.target else {
return;
};
if req.version() != Version::HTTP_11 {
tracing::info!("[Volo-HTTP] HttpProxy only works for HTTP/1.1");
return;
}
if let Some(scheme) = cx.target().scheme() {
if scheme != &Scheme::HTTP {
tracing::info!("[Volo-HTTP] HttpProxy only supports HTTP protocol");
return;
}
}
let Some(authority) = gen_authority(cx.target()) else {
tracing::warn!(
"[Volo-HTTP] HttpProxy: failed to gen authority by {:?}",
cx.target()
);
return;
};
let authority = match Authority::from_maybe_shared(Bytes::from(authority)) {
Ok(authority) => authority,
Err(e) => {
tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse authority: {e}");
return;
}
};
let mut parts = req.uri().to_owned().into_parts();
parts.scheme = Some(Scheme::HTTP);
parts.authority = Some(authority);
parts.path_and_query = Some(
parts
.path_and_query
.unwrap_or(PathAndQuery::from_static("/")),
);
let uri = match Uri::from_parts(parts) {
Ok(uri) => uri,
Err(e) => {
tracing::warn!("[Volo-HTTP] HttpProxy: failed to build uri: {e}");
return;
}
};
*req.uri_mut() = uri;
cx.rpc_info_mut().callee_mut().clear();
target
.to_owned()
.apply(cx)
.expect("infallible: failed to parse target in HttpProxy");
}
}
fn gen_authority(target: &Target) -> Option<String> {
let rt = match target {
Target::None => return None,
Target::Remote(rt) => rt,
#[cfg(target_family = "unix")]
Target::Local(_) => return None,
};
let default_port = is_default_port(&rt.scheme, rt.port);
let host = match &rt.host {
RemoteHost::Ip(ip) => {
if default_port {
if ip.is_ipv4() {
format!("{ip}")
} else {
format!("[{ip}]")
}
} else {
let port = rt.port;
if ip.is_ipv4() {
format!("{ip}:{port}")
} else {
format!("[{ip}]:{port}")
}
}
}
RemoteHost::Name(name) => {
let port = rt.port;
if default_port {
name.as_str().to_owned()
} else {
format!("{name}:{port}")
}
}
};
Some(host)
}
impl<B, S> Service<ClientContext, Request<B>> for HttpProxyService<S>
where
B: Send,
S: Service<ClientContext, Request<B>, Error = ClientError> + Send + Sync,
{
type Response = S::Response;
type Error = S::Error;
async fn call(
&self,
cx: &mut ClientContext,
mut req: Request<B>,
) -> Result<Self::Response, Self::Error> {
self.update_req(cx, &mut req);
match self.inner.call(cx, req).await {
Ok(resp) => Ok(resp),
Err(e) => {
if let Some(target) = &self.target {
let err = GatewayError {
inner: e,
gateway: target.to_owned(),
};
Err(request_error(err))
} else {
Err(e)
}
}
}
}
}