spacegate_kernel/
backend_service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3
4use futures_util::future::BoxFuture;
5use futures_util::Future;
6use hyper::{header::UPGRADE, Request, Response, StatusCode};
7use tracing::instrument;
8
9use crate::backend_service::http_client_service::get_client;
10use crate::helper_layers::map_future::MapFuture;
11use crate::utils::x_forwarded_for;
12use crate::BoxError;
13use crate::SgBody;
14use crate::SgRequest;
15use crate::SgResponse;
16use crate::SgResponseExt;
17
18pub mod echo;
19pub mod http_client_service;
20pub mod static_file_service;
21pub mod ws_client_service;
22pub trait SharedHyperService:
23    hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible, Future = BoxFuture<'static, Result<SgResponse, Infallible>>> + Send + Sync + 'static
24{
25}
26
27impl<T> SharedHyperService for T where
28    T: hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible, Future = BoxFuture<'static, Result<SgResponse, Infallible>>> + Send + Sync + 'static
29{
30}
31/// a service that can be shared between threads
32pub struct ArcHyperService {
33    pub shared: Arc<dyn SharedHyperService>,
34}
35
36impl std::fmt::Debug for ArcHyperService {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("ArcHyperService").finish()
39    }
40}
41
42impl Clone for ArcHyperService {
43    fn clone(&self) -> Self {
44        Self { shared: self.shared.clone() }
45    }
46}
47
48impl ArcHyperService {
49    pub fn new<T>(service: T) -> Self
50    where
51        T: hyper::service::Service<SgRequest, Response = SgResponse, Error = Infallible> + Send + Sync + 'static,
52        T::Future: Future<Output = Result<Response<SgBody>, Infallible>> + 'static + Send,
53    {
54        let map_fut = MapFuture::new(service, |fut| Box::pin(fut) as _);
55        Self { shared: Arc::new(map_fut) }
56    }
57    pub fn from_shared(shared: impl Into<Arc<dyn SharedHyperService>>) -> Self {
58        Self { shared: shared.into() }
59    }
60}
61
62impl hyper::service::Service<Request<SgBody>> for ArcHyperService {
63    type Response = Response<SgBody>;
64    type Error = Infallible;
65    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
66
67    #[inline]
68    fn call(&self, req: Request<SgBody>) -> Self::Future {
69        Box::pin(self.shared.call(req))
70    }
71}
72
73/// Http backend service
74///
75/// This function could be a bottom layer of a http router, it will handle http and websocket request.
76///
77/// This can handle both websocket connection and http request.
78///
79/// # Errors
80/// 1. Fail to collect body chunks
81/// 2. Fail to upgrade
82pub async fn http_backend_service_inner(mut req: Request<SgBody>) -> Result<SgResponse, BoxError> {
83    tracing::trace!(elapsed = ?req.extensions().get::<crate::extension::EnterTime>().map(crate::extension::EnterTime::elapsed), "start a backend request");
84    x_forwarded_for(&mut req)?;
85    let mut client = get_client();
86    let response = if req.headers().get(UPGRADE).is_some_and(|upgrade| upgrade.as_bytes().eq_ignore_ascii_case(b"websocket")) {
87        // dump request
88        let (part, body) = req.into_parts();
89        let body = body.dump().await?;
90        let req = Request::from_parts(part, body);
91
92        // forward request
93        let resp = client.request(req.clone()).await;
94
95        // dump response
96        let (part, body) = resp.into_parts();
97        let body = body.dump().await?;
98        let resp = Response::from_parts(part, body);
99
100        let req_for_upgrade = req.clone();
101        let resp_for_upgrade = resp.clone();
102
103        // create forward task
104        tokio::task::spawn(async move {
105            // update both side
106            let (s, c) = futures_util::join!(hyper::upgrade::on(req_for_upgrade), hyper::upgrade::on(resp_for_upgrade));
107            let upgrade_as_server = s?;
108            let upgrade_as_client = c?;
109            // start a websocket forward
110            ws_client_service::tcp_transfer(upgrade_as_server, upgrade_as_client).await?;
111            <Result<(), BoxError>>::Ok(())
112        });
113        // return response to client
114        resp
115    } else {
116        client.request(req).await
117    };
118    Ok(response)
119}
120
121#[instrument]
122pub async fn http_backend_service(req: Request<SgBody>) -> Result<Response<SgBody>, Infallible> {
123    match http_backend_service_inner(req).await {
124        Ok(resp) => Ok(resp),
125        Err(err) => Ok(Response::with_code_message(StatusCode::BAD_GATEWAY, format!("[Sg.Client] Client error: {err}"))),
126    }
127}
128
129#[inline]
130pub fn get_http_backend_service() -> ArcHyperService {
131    ArcHyperService::new(hyper::service::service_fn(http_backend_service))
132}
133
134#[cold]
135#[inline]
136pub fn get_echo_service() -> ArcHyperService {
137    ArcHyperService::new(hyper::service::service_fn(echo::echo))
138}