satex_service/proxy/
service.rs

1use crate::proxy::client::Client;
2use futures::future::LocalBoxFuture;
3use http::{HeaderName, Request, Response, Uri};
4use satex_core::Error;
5use satex_core::body::Body;
6use satex_core::digest::Digester;
7use satex_load_balancer::LoadBalancer;
8use std::future::ready;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use tower::Service;
13use tracing::debug;
14use url::Url;
15
16const REMOVE_HEADERS: [HeaderName; 9] = [
17    HeaderName::from_static("connection"),
18    HeaderName::from_static("keep-alive"),
19    HeaderName::from_static("transfer-encoding"),
20    HeaderName::from_static("te"),
21    HeaderName::from_static("trailer"),
22    HeaderName::from_static("proxy-authorization"),
23    HeaderName::from_static("proxy-authenticate"),
24    HeaderName::from_static("x-application-context"),
25    HeaderName::from_static("upgrade"),
26];
27
28#[derive(Clone)]
29pub struct ProxyRouteService<D> {
30    url: Url,
31    client: Client,
32    digester: Arc<D>,
33    load_balancer: Option<Arc<LoadBalancer>>,
34}
35
36impl<D> ProxyRouteService<D> {
37    pub fn new(
38        url: Url,
39        client: Client,
40        digester: D,
41        load_balancer: Option<Arc<LoadBalancer>>,
42    ) -> Self {
43        Self {
44            url,
45            client,
46            digester: Arc::new(digester),
47            load_balancer,
48        }
49    }
50}
51
52impl<D> Service<Request<Body>> for ProxyRouteService<D>
53where
54    D: Digester<Request<Body>>,
55{
56    type Response = Response<Body>;
57    type Error = Error;
58    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
59
60    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        self.client.poll_ready(cx).map_err(Error::new)
62    }
63
64    fn call(&mut self, mut request: Request<Body>) -> Self::Future {
65        let addr = self.load_balancer.as_deref().and_then(|load_balancer| {
66            let key = self.digester.digest(&request);
67            load_balancer.select(&key).map(|backend| backend.addr)
68        });
69
70        // 重新构造请求的uri
71        let uri = request.uri();
72        let path = uri.path();
73        let query = uri.query();
74        match reconstruct(self.url.clone(), addr, path, query) {
75            Ok(uri) => {
76                *request.uri_mut() = uri;
77            }
78            Err(e) => {
79                return Box::pin(ready(Err(Error::new(e))));
80            }
81        }
82
83        // 删除不应该转到后端的请求头
84        let headers = request.headers_mut();
85        REMOVE_HEADERS.iter().for_each(|header| {
86            headers.remove(header);
87        });
88
89        debug!("proxy send request:\n{:#?}", request);
90
91        // 发送请求到后端
92        let future = self.client.request(request);
93        Box::pin(async move {
94            future
95                .await
96                .map_err(Error::new)
97                .map(|response| response.map(Body::new))
98        })
99    }
100}
101
102fn reconstruct(
103    mut url: Url,
104    addr: Option<SocketAddr>,
105    path: &str,
106    query: Option<&str>,
107) -> Result<Uri, Error> {
108    if let Some(addr) = addr {
109        url.set_ip_host(addr.ip())
110            .map_err(|_| Error::new("set ip host error!"))?;
111        url.set_port(Some(addr.port()))
112            .map_err(|_| Error::new("set port error!"))?;
113    }
114    url.set_path(path);
115    url.set_query(query);
116    Uri::from_maybe_shared(String::from(url)).map_err(|_| Error::new("reconstruct url error!"))
117}