Skip to main content

gatel_core/server/
http_server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use hyper::Request;
5use hyper::body::Incoming;
6use hyper::service::{Service as HyperService, service_fn};
7use hyper_util::rt::{TokioIo, TokioTimer};
8use tokio::io::{AsyncRead, AsyncWrite};
9use tokio_rustls::server::TlsStream;
10
11use super::AppState;
12
13/// Serve a single plain TCP connection using hyper's auto HTTP/1+2 builder.
14pub async fn serve_connection(
15    stream: tokio::net::TcpStream,
16    local_addr: SocketAddr,
17    client_addr: SocketAddr,
18    state: Arc<AppState>,
19) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
20    serve_io(stream, local_addr, client_addr, state, false).await
21}
22
23/// Serve a single TLS-wrapped connection using hyper's auto HTTP/1+2 builder.
24pub async fn serve_tls_connection(
25    stream: TlsStream<tokio::net::TcpStream>,
26    local_addr: SocketAddr,
27    client_addr: SocketAddr,
28    state: Arc<AppState>,
29) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
30    serve_io(stream, local_addr, client_addr, state, true).await
31}
32
33/// Serve a connection on any I/O type that implements `AsyncRead + AsyncWrite + Unpin`.
34///
35/// This is the generic entry point used by both plain TCP and TLS connections,
36/// as well as connections wrapped in a `PrefixedStream` (when PROXY protocol
37/// is enabled).
38pub async fn serve_io<IO>(
39    io: IO,
40    local_addr: SocketAddr,
41    client_addr: SocketAddr,
42    state: Arc<AppState>,
43    is_tls: bool,
44) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
45where
46    IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
47{
48    let io = TokioIo::new(io);
49
50    let service = service_fn(move |req: Request<Incoming>| {
51        let state = Arc::clone(&state);
52        async move { handle_request(req, local_addr, client_addr, &state, is_tls).await }
53    });
54
55    hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
56        .http1()
57        .keep_alive(true)
58        .timer(TokioTimer::new())
59        .http2()
60        .timer(TokioTimer::new())
61        .serve_connection_with_upgrades(io, service)
62        .await?;
63
64    Ok(())
65}
66
67async fn handle_request(
68    req: Request<Incoming>,
69    local_addr: SocketAddr,
70    client_addr: SocketAddr,
71    state: &AppState,
72    is_tls: bool,
73) -> Result<hyper::Response<salvo::http::ResBody>, hyper::Error> {
74    let service = state.service.load();
75    let scheme = if is_tls {
76        salvo::http::uri::Scheme::HTTPS
77    } else {
78        salvo::http::uri::Scheme::HTTP
79    };
80
81    #[allow(unused_mut)]
82    let mut alt_svc_h3 = None;
83    #[cfg(feature = "http3")]
84    if is_tls {
85        let config = state.config.load();
86        if config.global.http3 {
87            alt_svc_h3 = format!("h3=\":{}\"; ma=2592000", config.global.https_addr.port())
88                .parse()
89                .ok();
90        }
91    }
92
93    let handler = service.hyper_handler(
94        local_addr.into(),
95        client_addr.into(),
96        scheme,
97        None,
98        alt_svc_h3,
99    );
100    handler.call(req).await
101}