salvo_proxy/
hyper_client.rs

1use hyper::upgrade::OnUpgrade;
2use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
3use hyper_util::client::legacy::{connect::HttpConnector, Client as HyperUtilClient};
4use hyper_util::rt::TokioExecutor;
5use salvo_core::http::{ReqBody, ResBody, StatusCode};
6use salvo_core::rt::tokio::TokioIo;
7use salvo_core::Error;
8use tokio::io::copy_bidirectional;
9
10use crate::{Client, HyperRequest, Proxy, BoxedError, Upstreams, HyperResponse};
11
12/// A [`Client`] implementation based on [`hyper_util::client::legacy::Client`].
13/// 
14/// This client provides proxy capabilities using the Hyper HTTP client library.
15/// It's lightweight and tightly integrated with the Tokio runtime.
16#[derive(Clone, Debug)]
17pub struct HyperClient {
18    inner: HyperUtilClient<HttpsConnector<HttpConnector>, ReqBody>,
19}
20
21impl Default for HyperClient {
22    fn default() -> Self {
23        let https = HttpsConnectorBuilder::new()
24            .with_native_roots()
25            .expect("no native root CA certificates found")
26            .https_or_http()
27            .enable_all_versions()
28            .build();
29        Self {
30            inner: HyperUtilClient::builder(TokioExecutor::new()).build(https),
31        }
32    }
33}
34
35impl<U> Proxy<U, HyperClient>
36where
37    U: Upstreams,
38    U::Error: Into<BoxedError>,
39{
40    /// Create a new `Proxy` using the default Hyper client.
41    /// 
42    /// This is a convenient way to create a proxy with standard configuration.
43    pub fn use_hyper_client(upstreams: U) -> Self {
44        Proxy::new(upstreams, HyperClient::default())
45    }
46}
47
48impl HyperClient {
49    /// Create a new `HyperClient` with the given `HyperClient`.
50    pub fn new(inner: HyperUtilClient<HttpsConnector<HttpConnector>, ReqBody>) -> Self {
51        Self { inner }
52    }
53}
54
55impl Client for HyperClient {
56    type Error = salvo_core::Error;
57
58    async fn execute(
59        &self,
60        proxied_request: HyperRequest,
61        request_upgraded: Option<OnUpgrade>,
62    ) -> Result<HyperResponse, Self::Error> {
63        let request_upgrade_type = crate::get_upgrade_type(proxied_request.headers()).map(|s| s.to_owned());
64
65        let mut response = self.inner.request(proxied_request).await.map_err(Error::other)?;
66
67        if response.status() == StatusCode::SWITCHING_PROTOCOLS {
68            let response_upgrade_type = crate::get_upgrade_type(response.headers());
69            if request_upgrade_type == response_upgrade_type.map(|s| s.to_lowercase()) {
70                let response_upgraded = hyper::upgrade::on(&mut response).await?;
71                if let Some(request_upgraded) = request_upgraded {
72                    tokio::spawn(async move {
73                        match request_upgraded.await {
74                            Ok(request_upgraded) => {
75                                let mut request_upgraded = TokioIo::new(request_upgraded);
76                                let mut response_upgraded = TokioIo::new(response_upgraded);
77                                if let Err(e) = copy_bidirectional(&mut response_upgraded, &mut request_upgraded).await
78                                {
79                                    tracing::error!(error = ?e, "coping between upgraded connections failed.");
80                                }
81                            }
82                            Err(e) => {
83                                tracing::error!(error = ?e, "upgrade request failed.");
84                            }
85                        }
86                    });
87                } else {
88                    return Err(Error::other("request does not have an upgrade extension."));
89                }
90            } else {
91                return Err(Error::other("upgrade type mismatch"));
92            }
93        }
94        Ok(response.map(ResBody::Hyper))
95    }
96}
97
98
99// Unit tests for Proxy
100#[cfg(test)]
101mod tests {
102    use salvo_core::prelude::*;
103    use salvo_core::test::*;
104
105    use super::*;
106    use crate::{Upstreams, Proxy};
107
108    #[tokio::test]
109    async fn test_upstreams_elect() {
110        let upstreams = vec!["https://www.example.com", "https://www.example2.com"];
111        let proxy = Proxy::new(upstreams.clone(), HyperClient::default());
112        let elected_upstream = proxy.upstreams().elect().await.unwrap();
113        assert!(upstreams.contains(&elected_upstream));
114    }
115
116    #[tokio::test]
117    async fn test_hyper_client() {
118        let router = Router::new().push(
119            Router::with_path("rust/{**rest}").goal(Proxy::new(vec!["https://www.rust-lang.org"], HyperClient::default())),
120        );
121
122        let content = TestClient::get("http://127.0.0.1:5801/rust/tools/install")
123            .send(router)
124            .await
125            .take_string()
126            .await
127            .unwrap();
128        println!("{}", content);
129        assert!(content.contains("Install Rust"));
130    }
131
132    #[test]
133    fn test_others() {
134        let mut handler = Proxy::new(["https://www.bing.com"], HyperClient::default());
135        assert_eq!(handler.upstreams().len(), 1);
136        assert_eq!(handler.upstreams_mut().len(), 1);
137    }
138}