volo_http/client/layer/
http_proxy.rs1use std::fmt;
6
7use bytes::Bytes;
8use http::{
9 uri::{Authority, PathAndQuery, Scheme, Uri},
10 version::Version,
11};
12use motore::{layer::Layer, service::Service};
13use volo::{client::Apply, context::Context};
14
15use crate::{
16 client::{Target, target::RemoteHost, utils::is_default_port},
17 context::ClientContext,
18 error::{ClientError, client::request_error},
19 request::Request,
20};
21
22pub struct HttpProxy {
26 target: Option<Target>,
27}
28
29impl<S> Layer<S> for HttpProxy {
30 type Service = HttpProxyService<S>;
31
32 fn layer(self, inner: S) -> Self::Service {
33 HttpProxyService {
34 inner,
35 target: self.target,
36 }
37 }
38}
39
40#[derive(Debug)]
42pub struct GatewayError {
43 inner: ClientError,
44 gateway: Target,
45}
46
47impl fmt::Display for GatewayError {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 write!(
50 f,
51 "error while using proxy {}: {}",
52 self.gateway, self.inner
53 )
54 }
55}
56
57impl std::error::Error for GatewayError {
58 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59 Some(&self.inner)
60 }
61}
62
63fn parse_uri(uri: Uri) -> Option<Target> {
64 if let Some(scheme) = uri.scheme() {
65 if scheme != &Scheme::HTTP {
66 tracing::warn!(
67 "[Volo-HTTP] HttpProxy: `{uri}` is not a valid proxy url, only HTTP protocol is \
68 supported"
69 );
70 return None;
71 }
72 }
73 let target = match Target::from_uri(&uri) {
74 Ok(target) => target,
75 Err(err) => {
76 tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse uri `{uri}`: {err}");
77 return None;
78 }
79 };
80
81 tracing::info!("[Volo-HTTP] HttpProxy: `{uri}` is used as http proxy");
82 Some(target)
83}
84
85fn parse_env() -> Option<Uri> {
86 let env = if let Ok(uri) = std::env::var("http_proxy") {
87 uri
88 } else if let Ok(uri) = std::env::var("HTTP_PROXY") {
89 uri
90 } else {
91 return None;
92 };
93 Uri::from_maybe_shared(env).ok()
94}
95
96impl HttpProxy {
97 pub fn env() -> Self {
102 let target = parse_env().and_then(parse_uri);
103 Self { target }
104 }
105
106 pub fn new<U>(uri: U) -> Self
110 where
111 U: TryInto<Uri>,
112 U::Error: std::error::Error,
113 {
114 let proxy_uri = match uri.try_into() {
115 Ok(uri) => Some(uri),
116 Err(e) => {
117 tracing::warn!("[Volo-HTTP] HttpProxy: failed to build http proxy: {e}");
118 None
119 }
120 };
121 let target = proxy_uri.and_then(parse_uri);
122 Self { target }
123 }
124}
125
126pub struct HttpProxyService<S> {
130 inner: S,
131 target: Option<Target>,
132}
133
134impl<S> HttpProxyService<S> {
135 fn update_req<B>(&self, cx: &mut ClientContext, req: &mut Request<B>) {
136 let Some(target) = &self.target else {
137 return;
138 };
139 if req.version() != Version::HTTP_11 {
140 tracing::info!("[Volo-HTTP] HttpProxy only works for HTTP/1.1");
141 return;
142 }
143 if let Some(scheme) = cx.target().scheme() {
144 if scheme != &Scheme::HTTP {
145 tracing::info!("[Volo-HTTP] HttpProxy only supports HTTP protocol");
146 return;
147 }
148 }
149
150 let Some(authority) = gen_authority(cx.target()) else {
152 tracing::warn!(
153 "[Volo-HTTP] HttpProxy: failed to gen authority by {:?}",
154 cx.target()
155 );
156 return;
157 };
158 let authority = match Authority::from_maybe_shared(Bytes::from(authority)) {
159 Ok(authority) => authority,
160 Err(e) => {
161 tracing::warn!("[Volo-HTTP] HttpProxy: failed to parse authority: {e}");
162 return;
163 }
164 };
165 let mut parts = req.uri().to_owned().into_parts();
166 parts.scheme = Some(Scheme::HTTP);
167 parts.authority = Some(authority);
168 parts.path_and_query = Some(
169 parts
170 .path_and_query
171 .unwrap_or(PathAndQuery::from_static("/")),
172 );
173 let uri = match Uri::from_parts(parts) {
174 Ok(uri) => uri,
175 Err(e) => {
176 tracing::warn!("[Volo-HTTP] HttpProxy: failed to build uri: {e}");
177 return;
178 }
179 };
180 *req.uri_mut() = uri;
181
182 cx.rpc_info_mut().callee_mut().clear();
186 target
187 .to_owned()
188 .apply(cx)
189 .expect("infallible: failed to parse target in HttpProxy");
190 }
191}
192
193fn gen_authority(target: &Target) -> Option<String> {
194 let rt = match target {
195 Target::None => return None,
196 Target::Remote(rt) => rt,
197 #[cfg(target_family = "unix")]
198 Target::Local(_) => return None,
199 };
200 let default_port = is_default_port(&rt.scheme, rt.port);
201 let host = match &rt.host {
202 RemoteHost::Ip(ip) => {
203 if default_port {
204 if ip.is_ipv4() {
205 format!("{ip}")
206 } else {
207 format!("[{ip}]")
208 }
209 } else {
210 let port = rt.port;
211 if ip.is_ipv4() {
212 format!("{ip}:{port}")
213 } else {
214 format!("[{ip}]:{port}")
215 }
216 }
217 }
218 RemoteHost::Name(name) => {
219 let port = rt.port;
220 if default_port {
221 name.as_str().to_owned()
222 } else {
223 format!("{name}:{port}")
224 }
225 }
226 };
227 Some(host)
228}
229
230impl<B, S> Service<ClientContext, Request<B>> for HttpProxyService<S>
231where
232 B: Send,
233 S: Service<ClientContext, Request<B>, Error = ClientError> + Send + Sync,
234{
235 type Response = S::Response;
236 type Error = S::Error;
237
238 async fn call(
239 &self,
240 cx: &mut ClientContext,
241 mut req: Request<B>,
242 ) -> Result<Self::Response, Self::Error> {
243 self.update_req(cx, &mut req);
244 match self.inner.call(cx, req).await {
245 Ok(resp) => Ok(resp),
246 Err(e) => {
247 if let Some(target) = &self.target {
248 let err = GatewayError {
249 inner: e,
250 gateway: target.to_owned(),
251 };
252 Err(request_error(err))
253 } else {
254 Err(e)
255 }
256 }
257 }
258 }
259}