volo_http/client/layer/
http_proxy.rs

1//! HTTP/1.1 Proxy defined by [RFC 7230][rfc7230]
2//!
3//! [rfc7230]: https://datatracker.ietf.org/doc/html/rfc7230
4
5use std::fmt;
6
7use bytes::Bytes;
8use http::{
9    uri::{Authority, PathAndQuery, Scheme, Uri},
10    version::Version,
11};
12use motore::{layer::Layer, service::Service};
13use volo::{client::Apply, context::Context};
14
15use crate::{
16    client::{Target, target::RemoteHost, utils::is_default_port},
17    context::ClientContext,
18    error::{ClientError, client::request_error},
19    request::Request,
20};
21
22/// A [`Layer`] implements HTTP/1.1 proxy defined by [RFC 7230][rfc7230].
23///
24/// [rfc7230]: https://datatracker.ietf.org/doc/html/rfc7230
25pub struct HttpProxy {
26    target: Option<Target>,
27}
28
29impl<S> Layer<S> for HttpProxy {
30    type Service = HttpProxyService<S>;
31
32    fn layer(self, inner: S) -> Self::Service {
33        HttpProxyService {
34            inner,
35            target: self.target,
36        }
37    }
38}
39
40/// [`ClientError`] during accessing proxy.
41#[derive(Debug)]
42pub struct GatewayError {
43    inner: ClientError,
44    gateway: Target,
45}
46
47impl fmt::Display for GatewayError {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        write!(
50            f,
51            "error while using proxy {}: {}",
52            self.gateway, self.inner
53        )
54    }
55}
56
57impl std::error::Error for GatewayError {
58    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59        Some(&self.inner)
60    }
61}
62
63fn parse_uri(uri: Uri) -> Option<Target> {
64    if let Some(scheme) = uri.scheme() {
65        if scheme != &Scheme::HTTP {
66            tracing::warn!(
67                "[Volo-HTTP] HttpProxy: `{uri}` is not a valid proxy url, only HTTP protocol is \
68                 supported"
69            );
70            return None;
71        }
72    }
73    let target = match Target::from_uri(&uri) {
74        Ok(target) => target,
75        Err(err) => {
76            tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse uri `{uri}`: {err}");
77            return None;
78        }
79    };
80
81    tracing::info!("[Volo-HTTP] HttpProxy: `{uri}` is used as http proxy");
82    Some(target)
83}
84
85fn parse_env() -> Option<Uri> {
86    let env = if let Ok(uri) = std::env::var("http_proxy") {
87        uri
88    } else if let Ok(uri) = std::env::var("HTTP_PROXY") {
89        uri
90    } else {
91        return None;
92    };
93    Uri::from_maybe_shared(env).ok()
94}
95
96impl HttpProxy {
97    /// Create a [`HttpProxy`] via the proxy server from environment variable `http_proxy` or
98    /// `HTTP_PROXY`.
99    ///
100    /// If there is no valid value in environment variable, the layer will do nothing.
101    pub fn env() -> Self {
102        let target = parse_env().and_then(parse_uri);
103        Self { target }
104    }
105
106    /// Create a [`HttpProxy`] via the `uri` as proxy server.
107    ///
108    /// If the argument `uri` is not a valid uri, the layer will do nothing.
109    pub fn new<U>(uri: U) -> Self
110    where
111        U: TryInto<Uri>,
112        U::Error: std::error::Error,
113    {
114        let proxy_uri = match uri.try_into() {
115            Ok(uri) => Some(uri),
116            Err(e) => {
117                tracing::warn!("[Volo-HTTP] HttpProxy: failed to build http proxy: {e}");
118                None
119            }
120        };
121        let target = proxy_uri.and_then(parse_uri);
122        Self { target }
123    }
124}
125
126/// [`Service`] generated by [`HttpProxy`].
127///
128/// Refer to [`HttpProxy`] for more details.
129pub struct HttpProxyService<S> {
130    inner: S,
131    target: Option<Target>,
132}
133
134impl<S> HttpProxyService<S> {
135    fn update_req<B>(&self, cx: &mut ClientContext, req: &mut Request<B>) {
136        let Some(target) = &self.target else {
137            return;
138        };
139        if req.version() != Version::HTTP_11 {
140            tracing::info!("[Volo-HTTP] HttpProxy only works for HTTP/1.1");
141            return;
142        }
143        if let Some(scheme) = cx.target().scheme() {
144            if scheme != &Scheme::HTTP {
145                tracing::info!("[Volo-HTTP] HttpProxy only supports HTTP protocol");
146                return;
147            }
148        }
149
150        // Generate authority by old target, and then update request
151        let Some(authority) = gen_authority(cx.target()) else {
152            tracing::warn!(
153                "[Volo-HTTP] HttpProxy: failed to gen authority by {:?}",
154                cx.target()
155            );
156            return;
157        };
158        let authority = match Authority::from_maybe_shared(Bytes::from(authority)) {
159            Ok(authority) => authority,
160            Err(e) => {
161                tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse authority: {e}");
162                return;
163            }
164        };
165        let mut parts = req.uri().to_owned().into_parts();
166        parts.scheme = Some(Scheme::HTTP);
167        parts.authority = Some(authority);
168        parts.path_and_query = Some(
169            parts
170                .path_and_query
171                .unwrap_or(PathAndQuery::from_static("/")),
172        );
173        let uri = match Uri::from_parts(parts) {
174            Ok(uri) => uri,
175            Err(e) => {
176                tracing::warn!("[Volo-HTTP] HttpProxy: failed to build uri: {e}");
177                return;
178            }
179        };
180        *req.uri_mut() = uri;
181
182        // Clear callee and update proxy target to it
183        // Note: we must apply target after updating request because `target.apply(cx)` will update
184        // self to `cx.target`
185        cx.rpc_info_mut().callee_mut().clear();
186        target
187            .to_owned()
188            .apply(cx)
189            .expect("infallible: failed to parse target in HttpProxy");
190    }
191}
192
193fn gen_authority(target: &Target) -> Option<String> {
194    let rt = match target {
195        Target::None => return None,
196        Target::Remote(rt) => rt,
197        #[cfg(target_family = "unix")]
198        Target::Local(_) => return None,
199    };
200    let default_port = is_default_port(&rt.scheme, rt.port);
201    let host = match &rt.host {
202        RemoteHost::Ip(ip) => {
203            if default_port {
204                if ip.is_ipv4() {
205                    format!("{ip}")
206                } else {
207                    format!("[{ip}]")
208                }
209            } else {
210                let port = rt.port;
211                if ip.is_ipv4() {
212                    format!("{ip}:{port}")
213                } else {
214                    format!("[{ip}]:{port}")
215                }
216            }
217        }
218        RemoteHost::Name(name) => {
219            let port = rt.port;
220            if default_port {
221                name.as_str().to_owned()
222            } else {
223                format!("{name}:{port}")
224            }
225        }
226    };
227    Some(host)
228}
229
230impl<B, S> Service<ClientContext, Request<B>> for HttpProxyService<S>
231where
232    B: Send,
233    S: Service<ClientContext, Request<B>, Error = ClientError> + Send + Sync,
234{
235    type Response = S::Response;
236    type Error = S::Error;
237
238    async fn call(
239        &self,
240        cx: &mut ClientContext,
241        mut req: Request<B>,
242    ) -> Result<Self::Response, Self::Error> {
243        self.update_req(cx, &mut req);
244        match self.inner.call(cx, req).await {
245            Ok(resp) => Ok(resp),
246            Err(e) => {
247                if let Some(target) = &self.target {
248                    let err = GatewayError {
249                        inner: e,
250                        gateway: target.to_owned(),
251                    };
252                    Err(request_error(err))
253                } else {
254                    Err(e)
255                }
256            }
257        }
258    }
259}