spacegate_kernel/
backend_service.rs1use std::convert::Infallible;
2use std::sync::Arc;
3
4use futures_util::future::BoxFuture;
5use futures_util::Future;
6use hyper::{header::UPGRADE, Request, Response, StatusCode};
7use tracing::instrument;
8
9use crate::backend_service::http_client_service::get_client;
10use crate::helper_layers::map_future::MapFuture;
11use crate::utils::x_forwarded_for;
12use crate::BoxError;
13use crate::SgBody;
14use crate::SgRequest;
15use crate::SgResponse;
16use crate::SgResponseExt;
17
18pub mod echo;
19pub mod http_client_service;
20pub mod static_file_service;
21pub mod ws_client_service;
22pub trait SharedHyperService:
23 hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible, Future = BoxFuture<'static, Result<SgResponse, Infallible>>> + Send + Sync + 'static
24{
25}
26
27impl<T> SharedHyperService for T where
28 T: hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible, Future = BoxFuture<'static, Result<SgResponse, Infallible>>> + Send + Sync + 'static
29{
30}
31pub struct ArcHyperService {
33 pub shared: Arc<dyn SharedHyperService>,
34}
35
36impl std::fmt::Debug for ArcHyperService {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("ArcHyperService").finish()
39 }
40}
41
42impl Clone for ArcHyperService {
43 fn clone(&self) -> Self {
44 Self { shared: self.shared.clone() }
45 }
46}
47
48impl ArcHyperService {
49 pub fn new<T>(service: T) -> Self
50 where
51 T: hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible> + Send + Sync + 'static,
52 T::Future: Future<Output = Result<Response<SgBody>, Infallible>> + 'static + Send,
53 {
54 let map_fut = MapFuture::new(service, |fut| Box::pin(fut) as _);
55 Self { shared: Arc::new(map_fut) }
56 }
57 pub fn from_shared(shared: impl Into<Arc<dyn SharedHyperService>>) -> Self {
58 Self { shared: shared.into() }
59 }
60}
61
62impl hyper::service::Service<Request<SgBody>> for ArcHyperService {
63 type Response = Response<SgBody>;
64 type Error = Infallible;
65 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
66
67 #[inline]
68 fn call(&self, req: Request<SgBody>) -> Self::Future {
69 Box::pin(self.shared.call(req))
70 }
71}
72
73pub async fn http_backend_service_inner(mut req: Request<SgBody>) -> Result<SgResponse, BoxError> {
83 tracing::trace!(elapsed = ?req.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "start a backend request");
84 x_forwarded_for(&mut req)?;
85 let mut client = get_client();
86 let response = if req.headers().get(UPGRADE).is_some_and(|upgrade| upgrade.as_bytes().eq_ignore_ascii_case(b"websocket")) {
87 let (part, body) = req.into_parts();
89 let body = body.dump().await?;
90 let req = Request::from_parts(part, body);
91
92 let resp = client.request(req.clone()).await;
94
95 let (part, body) = resp.into_parts();
97 let body = body.dump().await?;
98 let resp = Response::from_parts(part, body);
99
100 let req_for_upgrade = req.clone();
101 let resp_for_upgrade = resp.clone();
102
103 tokio::task::spawn(async move {
105 let (s, c) = futures_util::join!(hyper::upgrade::on(req_for_upgrade), hyper::upgrade::on(resp_for_upgrade));
107 let upgrade_as_server = s?;
108 let upgrade_as_client = c?;
109 ws_client_service::tcp_transfer(upgrade_as_server, upgrade_as_client).await?;
111 <Result<(), BoxError>>::Ok(())
112 });
113 resp
115 } else {
116 client.request(req).await
117 };
118 Ok(response)
119}
120
121#[instrument]
122pub async fn http_backend_service(req: Request<SgBody>) -> Result<Response<SgBody>, Infallible> {
123 match http_backend_service_inner(req).await {
124 Ok(resp) => Ok(resp),
125 Err(err) => Ok(Response::with_code_message(StatusCode::BAD_GATEWAY, format!("[Sg.Client] Client error: {err}"))),
126 }
127}
128
129#[inline]
130pub fn get_http_backend_service() -> ArcHyperService {
131 ArcHyperService::new(hyper::service::service_fn(http_backend_service))
132}
133
134#[cold]
135#[inline]
136pub fn get_echo_service() -> ArcHyperService {
137 ArcHyperService::new(hyper::service::service_fn(echo::echo))
138}