spacegate_kernel/service/
http_route.rs

1pub mod builder;
2pub mod match_hostname;
3pub mod match_request;
4use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
5const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
6use crate::{
7    backend_service::{get_http_backend_service, http_backend_service, static_file_service::static_file_service, ArcHyperService},
8    extension::{BackendHost, Defer, Reflect},
9    helper_layers::balancer::{self, Balancer},
10    utils::{fold_box_layers::fold_layers, schema_port::port_to_schema},
11    BoxLayer, SgBody,
12};
13
14use futures_util::future::BoxFuture;
15use hyper::{Request, Response, Version};
16
17use tower_layer::Layer;
18
19use self::{
20    builder::{HttpBackendBuilder, HttpRouteBuilder, HttpRouteRuleBuilder},
21    match_request::HttpRouteMatch,
22};
23
24/****************************************************************************************
25
26                                          Route
27
28*****************************************************************************************/
29
30#[derive(Debug)]
31pub struct HttpRoute {
32    pub name: String,
33    pub hostnames: Vec<String>,
34    pub plugins: Vec<BoxLayer>,
35    pub rules: Vec<HttpRouteRule>,
36    pub priority: i16,
37    pub ext: hyper::http::Extensions,
38}
39
40impl HttpRoute {
41    pub fn builder() -> HttpRouteBuilder {
42        HttpRouteBuilder::new()
43    }
44}
45#[derive(Debug, Clone)]
46pub struct HttpRouter {
47    pub hostnames: Arc<[String]>,
48    pub rules: Arc<[Option<Arc<[Arc<HttpRouteMatch>]>>]>,
49    pub ext: hyper::http::Extensions,
50}
51
52/****************************************************************************************
53
54                                        Route Rule
55
56*****************************************************************************************/
57
58#[derive(Debug)]
59pub struct HttpRouteRule {
60    pub r#match: Option<Vec<HttpRouteMatch>>,
61    pub plugins: Vec<BoxLayer>,
62    timeouts: Option<Duration>,
63    backends: Vec<HttpBackend>,
64    balance_policy: BalancePolicyEnum,
65    pub ext: hyper::http::Extensions,
66}
67
68#[derive(Debug, Default)]
69pub enum BalancePolicyEnum {
70    Random,
71    #[default]
72    IpHash,
73}
74
75impl HttpRouteRule {
76    pub fn builder() -> HttpRouteRuleBuilder {
77        HttpRouteRuleBuilder::new()
78    }
79    pub fn as_service(&self) -> HttpRouteRuleService {
80        use crate::helper_layers::timeout::TimeoutLayer;
81        let filter_layer = self.plugins.iter();
82        let time_out = self.timeouts.unwrap_or(DEFAULT_TIMEOUT);
83        let fallback = get_http_backend_service();
84        let service_iter = self.backends.iter().map(HttpBackend::as_service).collect::<Vec<_>>();
85        let balanced = match self.balance_policy {
86            BalancePolicyEnum::Random => {
87                let weights = self.backends.iter().map(|x| x.weight);
88                ArcHyperService::new(Balancer::new(balancer::Random::new(weights), service_iter, fallback))
89            }
90            BalancePolicyEnum::IpHash => ArcHyperService::new(Balancer::new(balancer::IpHash::default(), service_iter, fallback)),
91        };
92        let service = fold_layers(filter_layer, ArcHyperService::new(TimeoutLayer::new(time_out).layer(balanced)));
93        HttpRouteRuleService { service }
94    }
95}
96
97#[derive(Clone, Debug)]
98pub struct HttpRouteRuleService {
99    pub service: ArcHyperService,
100}
101
102impl hyper::service::Service<Request<SgBody>> for HttpRouteRuleService {
103    type Response = Response<SgBody>;
104    type Error = Infallible;
105    type Future = <ArcHyperService as hyper::service::Service<Request<SgBody>>>::Future;
106    // #[instrument("route_rule", skip_all)]
107    fn call(&self, req: Request<SgBody>) -> Self::Future {
108        let fut = self.service.call(req);
109        Box::pin(fut)
110    }
111}
112
113/****************************************************************************************
114
115                                        Backend
116
117*****************************************************************************************/
118#[derive(Debug)]
119pub struct HttpBackend {
120    pub plugins: Vec<BoxLayer>,
121    pub backend: Backend,
122    pub weight: u16,
123    pub timeout: Option<Duration>,
124    pub ext: hyper::http::Extensions,
125}
126
127impl HttpBackend {
128    pub fn builder() -> HttpBackendBuilder {
129        HttpBackendBuilder::new()
130    }
131    pub fn as_service(&self) -> ArcHyperService {
132        let inner_service = HttpBackendService {
133            backend: self.backend.clone().into(),
134        };
135        let timeout_layer = crate::helper_layers::timeout::TimeoutLayer::new(self.timeout.unwrap_or(DEFAULT_TIMEOUT));
136        let filtered = fold_layers(self.plugins.iter(), ArcHyperService::new(timeout_layer.layer(inner_service)));
137        filtered
138    }
139}
140
141#[derive(Clone, Debug)]
142pub enum Backend {
143    Http {
144        host: Option<String>,
145        port: Option<u16>,
146        schema: Option<String>,
147        version: Option<Version>,
148    },
149    File {
150        path: PathBuf,
151    },
152}
153
154#[derive(Clone, Debug)]
155pub struct HttpBackendService {
156    pub backend: Arc<Backend>,
157}
158
159impl HttpBackendService {
160    pub fn http_default() -> Self {
161        Self {
162            backend: Arc::new(Backend::Http {
163                host: None,
164                port: None,
165                schema: None,
166                version: None,
167            }),
168        }
169    }
170}
171
172impl hyper::service::Service<Request<SgBody>> for HttpBackendService {
173    type Response = Response<SgBody>;
174    type Error = Infallible;
175    type Future = BoxFuture<'static, Result<Response<SgBody>, Infallible>>;
176
177    fn call(&self, mut req: Request<SgBody>) -> Self::Future {
178        let req = match self.backend.as_ref() {
179            Backend::Http {
180                host: None,
181                port: None,
182                schema: None,
183                version: None,
184            }
185            | Backend::File { .. } => req,
186            Backend::Http { host, port, schema, version } => {
187                if let Some(ref host) = host {
188                    if let Some(reflect) = req.extensions_mut().get_mut::<Reflect>() {
189                        reflect.insert(BackendHost::new(host.clone()));
190                    }
191                    req.extensions_mut().insert(BackendHost::new(host.clone()));
192                }
193                let uri = req.uri_mut();
194                let (raw_host, raw_port) = if let Some(auth) = uri.authority() { (auth.host(), auth.port_u16()) } else { ("", None) };
195                let new_host = host.as_deref().unwrap_or(raw_host);
196                let new_port = port.map(u16::from).or(raw_port);
197                let new_scheme = schema.as_deref().or(uri.scheme_str()).or_else(|| new_port.and_then(port_to_schema)).unwrap_or("http");
198                let mut builder = hyper::http::uri::Uri::builder().scheme(new_scheme);
199                if let Some(new_port) = new_port {
200                    builder = builder.authority(format!("{}:{}", new_host, new_port));
201                } else {
202                    builder = builder.authority(new_host);
203                };
204                if let Some(path_and_query) = uri.path_and_query() {
205                    builder = builder.path_and_query(path_and_query.clone());
206                }
207                match builder.build() {
208                    Ok(uri) => {
209                        tracing::trace!("[Sg.Backend] new uri: {uri}");
210                        *req.uri_mut() = uri;
211                    }
212                    Err(e) => {
213                        tracing::error!("Failed to build uri: {}", e);
214                    }
215                }
216                if let Some(version) = version {
217                    *req.version_mut() = *version;
218                };
219                req
220            }
221        };
222        let backend = self.backend.clone();
223        let req = if let Some(defer) = req.extensions().get::<Defer>().cloned() {
224            defer.apply(req)
225        } else {
226            req
227        };
228        tracing::trace!(elapsed = ?req.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "enter backend {backend:?}");
229        Box::pin(async move {
230            unsafe {
231                let mut response = match backend.as_ref() {
232                    Backend::Http { .. } => http_backend_service(req).await.unwrap_unchecked(),
233                    Backend::File { path } => static_file_service(req, path).await,
234                };
235                response.extensions_mut().insert(crate::extension::FromBackend::new());
236                tracing::trace!(elapsed = ?response.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "finish backend request");
237                Ok(response)
238            }
239        })
240    }
241}