volo-http 0.5.5

HTTP framework implementation of volo.
Documentation
//! HTTP/1.1 Proxy defined by [RFC 7230][rfc7230]
//!
//! [rfc7230]: https://datatracker.ietf.org/doc/html/rfc7230

use std::fmt;

use bytes::Bytes;
use http::{
    uri::{Authority, PathAndQuery, Scheme, Uri},
    version::Version,
};
use motore::{layer::Layer, service::Service};
use volo::{client::Apply, context::Context};

use crate::{
    client::{Target, target::RemoteHost, utils::is_default_port},
    context::ClientContext,
    error::{ClientError, client::request_error},
    request::Request,
};

/// A [`Layer`] implements HTTP/1.1 proxy defined by [RFC 7230][rfc7230].
///
/// [rfc7230]: https://datatracker.ietf.org/doc/html/rfc7230
pub struct HttpProxy {
    target: Option<Target>,
}

impl<S> Layer<S> for HttpProxy {
    type Service = HttpProxyService<S>;

    fn layer(self, inner: S) -> Self::Service {
        HttpProxyService {
            inner,
            target: self.target,
        }
    }
}

/// [`ClientError`] during accessing proxy.
#[derive(Debug)]
pub struct GatewayError {
    inner: ClientError,
    gateway: Target,
}

impl fmt::Display for GatewayError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "error while using proxy {}: {}",
            self.gateway, self.inner
        )
    }
}

impl std::error::Error for GatewayError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        Some(&self.inner)
    }
}

fn parse_uri(uri: Uri) -> Option<Target> {
    if let Some(scheme) = uri.scheme() {
        if scheme != &Scheme::HTTP {
            tracing::warn!(
                "[Volo-HTTP] HttpProxy: `{uri}` is not a valid proxy url, only HTTP protocol is \
                 supported"
            );
            return None;
        }
    }
    let target = match Target::from_uri(&uri) {
        Ok(target) => target,
        Err(err) => {
            tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse uri `{uri}`: {err}");
            return None;
        }
    };

    tracing::info!("[Volo-HTTP] HttpProxy: `{uri}` is used as http proxy");
    Some(target)
}

fn parse_env() -> Option<Uri> {
    let env = if let Ok(uri) = std::env::var("http_proxy") {
        uri
    } else if let Ok(uri) = std::env::var("HTTP_PROXY") {
        uri
    } else {
        return None;
    };
    Uri::from_maybe_shared(env).ok()
}

impl HttpProxy {
    /// Create a [`HttpProxy`] via the proxy server from environment variable `http_proxy` or
    /// `HTTP_PROXY`.
    ///
    /// If there is no valid value in environment variable, the layer will do nothing.
    pub fn env() -> Self {
        let target = parse_env().and_then(parse_uri);
        Self { target }
    }

    /// Create a [`HttpProxy`] via the `uri` as proxy server.
    ///
    /// If the argument `uri` is not a valid uri, the layer will do nothing.
    pub fn new<U>(uri: U) -> Self
    where
        U: TryInto<Uri>,
        U::Error: std::error::Error,
    {
        let proxy_uri = match uri.try_into() {
            Ok(uri) => Some(uri),
            Err(e) => {
                tracing::warn!("[Volo-HTTP] HttpProxy: failed to build http proxy: {e}");
                None
            }
        };
        let target = proxy_uri.and_then(parse_uri);
        Self { target }
    }
}

/// [`Service`] generated by [`HttpProxy`].
///
/// Refer to [`HttpProxy`] for more details.
pub struct HttpProxyService<S> {
    inner: S,
    target: Option<Target>,
}

impl<S> HttpProxyService<S> {
    fn update_req<B>(&self, cx: &mut ClientContext, req: &mut Request<B>) {
        let Some(target) = &self.target else {
            return;
        };
        if req.version() != Version::HTTP_11 {
            tracing::info!("[Volo-HTTP] HttpProxy only works for HTTP/1.1");
            return;
        }
        if let Some(scheme) = cx.target().scheme() {
            if scheme != &Scheme::HTTP {
                tracing::info!("[Volo-HTTP] HttpProxy only supports HTTP protocol");
                return;
            }
        }

        // Generate authority by old target, and then update request
        let Some(authority) = gen_authority(cx.target()) else {
            tracing::warn!(
                "[Volo-HTTP] HttpProxy: failed to gen authority by {:?}",
                cx.target()
            );
            return;
        };
        let authority = match Authority::from_maybe_shared(Bytes::from(authority)) {
            Ok(authority) => authority,
            Err(e) => {
                tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse authority: {e}");
                return;
            }
        };
        let mut parts = req.uri().to_owned().into_parts();
        parts.scheme = Some(Scheme::HTTP);
        parts.authority = Some(authority);
        parts.path_and_query = Some(
            parts
                .path_and_query
                .unwrap_or(PathAndQuery::from_static("/")),
        );
        let uri = match Uri::from_parts(parts) {
            Ok(uri) => uri,
            Err(e) => {
                tracing::warn!("[Volo-HTTP] HttpProxy: failed to build uri: {e}");
                return;
            }
        };
        *req.uri_mut() = uri;

        // Clear callee and update proxy target to it
        // Note: we must apply target after updating request because `target.apply(cx)` will update
        // self to `cx.target`
        cx.rpc_info_mut().callee_mut().clear();
        target
            .to_owned()
            .apply(cx)
            .expect("infallible: failed to parse target in HttpProxy");
    }
}

fn gen_authority(target: &Target) -> Option<String> {
    let rt = match target {
        Target::None => return None,
        Target::Remote(rt) => rt,
        #[cfg(target_family = "unix")]
        Target::Local(_) => return None,
    };
    let default_port = is_default_port(&rt.scheme, rt.port);
    let host = match &rt.host {
        RemoteHost::Ip(ip) => {
            if default_port {
                if ip.is_ipv4() {
                    format!("{ip}")
                } else {
                    format!("[{ip}]")
                }
            } else {
                let port = rt.port;
                if ip.is_ipv4() {
                    format!("{ip}:{port}")
                } else {
                    format!("[{ip}]:{port}")
                }
            }
        }
        RemoteHost::Name(name) => {
            let port = rt.port;
            if default_port {
                name.as_str().to_owned()
            } else {
                format!("{name}:{port}")
            }
        }
    };
    Some(host)
}

impl<B, S> Service<ClientContext, Request<B>> for HttpProxyService<S>
where
    B: Send,
    S: Service<ClientContext, Request<B>, Error = ClientError> + Send + Sync,
{
    type Response = S::Response;
    type Error = S::Error;

    async fn call(
        &self,
        cx: &mut ClientContext,
        mut req: Request<B>,
    ) -> Result<Self::Response, Self::Error> {
        self.update_req(cx, &mut req);
        match self.inner.call(cx, req).await {
            Ok(resp) => Ok(resp),
            Err(e) => {
                if let Some(target) = &self.target {
                    let err = GatewayError {
                        inner: e,
                        gateway: target.to_owned(),
                    };
                    Err(request_error(err))
                } else {
                    Err(e)
                }
            }
        }
    }
}