satex_service/proxy/
service.rs1use crate::proxy::client::Client;
2use futures::future::LocalBoxFuture;
3use http::{HeaderName, Request, Response, Uri};
4use satex_core::Error;
5use satex_core::body::Body;
6use satex_core::digest::Digester;
7use satex_load_balancer::LoadBalancer;
8use std::future::ready;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use tower::Service;
13use tracing::debug;
14use url::Url;
15
16const REMOVE_HEADERS: [HeaderName; 9] = [
17 HeaderName::from_static("connection"),
18 HeaderName::from_static("keep-alive"),
19 HeaderName::from_static("transfer-encoding"),
20 HeaderName::from_static("te"),
21 HeaderName::from_static("trailer"),
22 HeaderName::from_static("proxy-authorization"),
23 HeaderName::from_static("proxy-authenticate"),
24 HeaderName::from_static("x-application-context"),
25 HeaderName::from_static("upgrade"),
26];
27
28#[derive(Clone)]
29pub struct ProxyRouteService<D> {
30 url: Url,
31 client: Client,
32 digester: Arc<D>,
33 load_balancer: Option<Arc<LoadBalancer>>,
34}
35
36impl<D> ProxyRouteService<D> {
37 pub fn new(
38 url: Url,
39 client: Client,
40 digester: D,
41 load_balancer: Option<Arc<LoadBalancer>>,
42 ) -> Self {
43 Self {
44 url,
45 client,
46 digester: Arc::new(digester),
47 load_balancer,
48 }
49 }
50}
51
52impl<D> Service<Request<Body>> for ProxyRouteService<D>
53where
54 D: Digester<Request<Body>>,
55{
56 type Response = Response<Body>;
57 type Error = Error;
58 type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
59
60 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 self.client.poll_ready(cx).map_err(Error::new)
62 }
63
64 fn call(&mut self, mut request: Request<Body>) -> Self::Future {
65 let addr = self.load_balancer.as_deref().and_then(|load_balancer| {
66 let key = self.digester.digest(&request);
67 load_balancer.select(&key).map(|backend| backend.addr)
68 });
69
70 let uri = request.uri();
72 let path = uri.path();
73 let query = uri.query();
74 match reconstruct(self.url.clone(), addr, path, query) {
75 Ok(uri) => {
76 *request.uri_mut() = uri;
77 }
78 Err(e) => {
79 return Box::pin(ready(Err(Error::new(e))));
80 }
81 }
82
83 let headers = request.headers_mut();
85 REMOVE_HEADERS.iter().for_each(|header| {
86 headers.remove(header);
87 });
88
89 debug!("proxy send request:\n{:#?}", request);
90
91 let future = self.client.request(request);
93 Box::pin(async move {
94 future
95 .await
96 .map_err(Error::new)
97 .map(|response| response.map(Body::new))
98 })
99 }
100}
101
102fn reconstruct(
103 mut url: Url,
104 addr: Option<SocketAddr>,
105 path: &str,
106 query: Option<&str>,
107) -> Result<Uri, Error> {
108 if let Some(addr) = addr {
109 url.set_ip_host(addr.ip())
110 .map_err(|_| Error::new("set ip host error!"))?;
111 url.set_port(Some(addr.port()))
112 .map_err(|_| Error::new("set port error!"))?;
113 }
114 url.set_path(path);
115 url.set_query(query);
116 Uri::from_maybe_shared(String::from(url)).map_err(|_| Error::new("reconstruct url error!"))
117}