spacegate_kernel/service/
http_route.rs1pub mod builder;
2pub mod match_hostname;
3pub mod match_request;
4use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
5const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
6use crate::{
7 backend_service::{get_http_backend_service, http_backend_service, static_file_service::static_file_service, ArcHyperService},
8 extension::{BackendHost, Defer, Reflect},
9 helper_layers::balancer::{self, Balancer},
10 utils::{fold_box_layers::fold_layers, schema_port::port_to_schema},
11 BoxLayer, SgBody,
12};
13
14use futures_util::future::BoxFuture;
15use hyper::{Request, Response, Version};
16
17use tower_layer::Layer;
18
19use self::{
20 builder::{HttpBackendBuilder, HttpRouteBuilder, HttpRouteRuleBuilder},
21 match_request::HttpRouteMatch,
22};
23
24#[derive(Debug)]
31pub struct HttpRoute {
32 pub name: String,
33 pub hostnames: Vec<String>,
34 pub plugins: Vec<BoxLayer>,
35 pub rules: Vec<HttpRouteRule>,
36 pub priority: i16,
37 pub ext: hyper::http::Extensions,
38}
39
40impl HttpRoute {
41 pub fn builder() -> HttpRouteBuilder {
42 HttpRouteBuilder::new()
43 }
44}
45#[derive(Debug, Clone)]
46pub struct HttpRouter {
47 pub hostnames: Arc<[String]>,
48 pub rules: Arc<[Option<Arc<[Arc<HttpRouteMatch>]>>]>,
49 pub ext: hyper::http::Extensions,
50}
51
52#[derive(Debug)]
59pub struct HttpRouteRule {
60 pub r#match: Option<Vec<HttpRouteMatch>>,
61 pub plugins: Vec<BoxLayer>,
62 timeouts: Option<Duration>,
63 backends: Vec<HttpBackend>,
64 balance_policy: BalancePolicyEnum,
65 pub ext: hyper::http::Extensions,
66}
67
68#[derive(Debug, Default)]
69pub enum BalancePolicyEnum {
70 Random,
71 #[default]
72 IpHash,
73}
74
75impl HttpRouteRule {
76 pub fn builder() -> HttpRouteRuleBuilder {
77 HttpRouteRuleBuilder::new()
78 }
79 pub fn as_service(&self) -> HttpRouteRuleService {
80 use crate::helper_layers::timeout::TimeoutLayer;
81 let filter_layer = self.plugins.iter();
82 let time_out = self.timeouts.unwrap_or(DEFAULT_TIMEOUT);
83 let fallback = get_http_backend_service();
84 let service_iter = self.backends.iter().map(HttpBackend::as_service).collect::<Vec<_>>();
85 let balanced = match self.balance_policy {
86 BalancePolicyEnum::Random => {
87 let weights = self.backends.iter().map(|x| x.weight);
88 ArcHyperService::new(Balancer::new(balancer::Random::new(weights), service_iter, fallback))
89 }
90 BalancePolicyEnum::IpHash => ArcHyperService::new(Balancer::new(balancer::IpHash::default(), service_iter, fallback)),
91 };
92 let service = fold_layers(filter_layer, ArcHyperService::new(TimeoutLayer::new(time_out).layer(balanced)));
93 HttpRouteRuleService { service }
94 }
95}
96
97#[derive(Clone, Debug)]
98pub struct HttpRouteRuleService {
99 pub service: ArcHyperService,
100}
101
102impl hyper::service::Service<Request<SgBody>> for HttpRouteRuleService {
103 type Response = Response<SgBody>;
104 type Error = Infallible;
105 type Future = <ArcHyperService as hyper::service::Service<Request<SgBody>>>::Future;
106 fn call(&self, req: Request<SgBody>) -> Self::Future {
108 let fut = self.service.call(req);
109 Box::pin(fut)
110 }
111}
112
113#[derive(Debug)]
119pub struct HttpBackend {
120 pub plugins: Vec<BoxLayer>,
121 pub backend: Backend,
122 pub weight: u16,
123 pub timeout: Option<Duration>,
124 pub ext: hyper::http::Extensions,
125}
126
127impl HttpBackend {
128 pub fn builder() -> HttpBackendBuilder {
129 HttpBackendBuilder::new()
130 }
131 pub fn as_service(&self) -> ArcHyperService {
132 let inner_service = HttpBackendService {
133 backend: self.backend.clone().into(),
134 };
135 let timeout_layer = crate::helper_layers::timeout::TimeoutLayer::new(self.timeout.unwrap_or(DEFAULT_TIMEOUT));
136 let filtered = fold_layers(self.plugins.iter(), ArcHyperService::new(timeout_layer.layer(inner_service)));
137 filtered
138 }
139}
140
141#[derive(Clone, Debug)]
142pub enum Backend {
143 Http {
144 host: Option<String>,
145 port: Option<u16>,
146 schema: Option<String>,
147 version: Option<Version>,
148 },
149 File {
150 path: PathBuf,
151 },
152}
153
154#[derive(Clone, Debug)]
155pub struct HttpBackendService {
156 pub backend: Arc<Backend>,
157}
158
159impl HttpBackendService {
160 pub fn http_default() -> Self {
161 Self {
162 backend: Arc::new(Backend::Http {
163 host: None,
164 port: None,
165 schema: None,
166 version: None,
167 }),
168 }
169 }
170}
171
172impl hyper::service::Service<Request<SgBody>> for HttpBackendService {
173 type Response = Response<SgBody>;
174 type Error = Infallible;
175 type Future = BoxFuture<'static, Result<Response<SgBody>, Infallible>>;
176
177 fn call(&self, mut req: Request<SgBody>) -> Self::Future {
178 let req = match self.backend.as_ref() {
179 Backend::Http {
180 host: None,
181 port: None,
182 schema: None,
183 version: None,
184 }
185 | Backend::File { .. } => req,
186 Backend::Http { host, port, schema, version } => {
187 if let Some(ref host) = host {
188 if let Some(reflect) = req.extensions_mut().get_mut::<Reflect>() {
189 reflect.insert(BackendHost::new(host.clone()));
190 }
191 req.extensions_mut().insert(BackendHost::new(host.clone()));
192 }
193 let uri = req.uri_mut();
194 let (raw_host, raw_port) = if let Some(auth) = uri.authority() { (auth.host(), auth.port_u16()) } else { ("", None) };
195 let new_host = host.as_deref().unwrap_or(raw_host);
196 let new_port = port.map(u16::from).or(raw_port);
197 let new_scheme = schema.as_deref().or(uri.scheme_str()).or_else(|| new_port.and_then(port_to_schema)).unwrap_or("http");
198 let mut builder = hyper::http::uri::Uri::builder().scheme(new_scheme);
199 if let Some(new_port) = new_port {
200 builder = builder.authority(format!("{}:{}", new_host, new_port));
201 } else {
202 builder = builder.authority(new_host);
203 };
204 if let Some(path_and_query) = uri.path_and_query() {
205 builder = builder.path_and_query(path_and_query.clone());
206 }
207 match builder.build() {
208 Ok(uri) => {
209 tracing::trace!("[Sg.Backend] new uri: {uri}");
210 *req.uri_mut() = uri;
211 }
212 Err(e) => {
213 tracing::error!("Failed to build uri: {}", e);
214 }
215 }
216 if let Some(version) = version {
217 *req.version_mut() = *version;
218 };
219 req
220 }
221 };
222 let backend = self.backend.clone();
223 let req = if let Some(defer) = req.extensions().get::<Defer>().cloned() {
224 defer.apply(req)
225 } else {
226 req
227 };
228 tracing::trace!(elapsed = ?req.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "enter backend {backend:?}");
229 Box::pin(async move {
230 unsafe {
231 let mut response = match backend.as_ref() {
232 Backend::Http { .. } => http_backend_service(req).await.unwrap_unchecked(),
233 Backend::File { path } => static_file_service(req, path).await,
234 };
235 response.extensions_mut().insert(crate::extension::FromBackend::new());
236 tracing::trace!(elapsed = ?response.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "finish backend request");
237 Ok(response)
238 }
239 })
240 }
241}