Skip to main content

gatel_core/server/
mod.rs

1pub mod graceful;
2#[cfg(feature = "http3")]
3pub mod h3_server;
4pub mod http_server;
5pub mod proxy_protocol;
6
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10pub use graceful::GracefulShutdown;
11pub use proxy_protocol::{PrefixedStream, ProxyProtocolHeader};
12use tokio::net::TcpListener;
13use tracing::{debug, error, info, warn};
14
15use crate::config::AppConfig;
16use crate::hoops::metrics::Metrics;
17use crate::plugin::ModuleRegistry;
18use crate::salvo_service;
19use crate::tls::TlsManager;
20
21/// Shared application state, hot-swappable via ArcSwap.
22pub struct AppState {
23    pub config: ArcSwap<AppConfig>,
24    pub service: ArcSwap<salvo::Service>,
25    pub tls_manager: Option<TlsManager>,
26    pub shutdown: GracefulShutdown,
27    /// Optional path to the config file, used for hot-reload from the admin API.
28    pub config_path: Option<String>,
29    /// Shared metrics store.
30    pub metrics: Arc<Metrics>,
31    /// Module registry for plugin-provided middleware and handlers.
32    pub modules: Arc<ModuleRegistry>,
33}
34
35impl AppState {
36    pub fn new(config: AppConfig, tls_manager: Option<TlsManager>) -> Arc<Self> {
37        let grace_period = config.global.grace_period;
38        let service = salvo_service::build_service(&config, &crate::plugin::ModuleRegistry::new());
39        Arc::new(Self {
40            config: ArcSwap::from_pointee(config),
41            service: ArcSwap::new(Arc::new(service)),
42            tls_manager,
43            shutdown: GracefulShutdown::new(grace_period),
44            config_path: None,
45            metrics: Arc::new(Metrics::new()),
46            modules: Arc::new(ModuleRegistry::new()),
47        })
48    }
49
50    /// Create a new `AppState` with a config file path for hot-reload support.
51    pub fn with_config_path(
52        config: AppConfig,
53        tls_manager: Option<TlsManager>,
54        path: String,
55    ) -> Arc<Self> {
56        let grace_period = config.global.grace_period;
57        let service = salvo_service::build_service(&config, &crate::plugin::ModuleRegistry::new());
58        Arc::new(Self {
59            config: ArcSwap::from_pointee(config),
60            service: ArcSwap::new(Arc::new(service)),
61            tls_manager,
62            shutdown: GracefulShutdown::new(grace_period),
63            config_path: Some(path),
64            metrics: Arc::new(Metrics::new()),
65            modules: Arc::new(ModuleRegistry::new()),
66        })
67    }
68
69    /// Create a new `AppState` with a pre-populated module registry.
70    pub fn with_modules(
71        config: AppConfig,
72        tls_manager: Option<TlsManager>,
73        modules: ModuleRegistry,
74    ) -> Arc<Self> {
75        let grace_period = config.global.grace_period;
76        let modules = Arc::new(modules);
77        let service = salvo_service::build_service(&config, &modules);
78        Arc::new(Self {
79            config: ArcSwap::from_pointee(config),
80            service: ArcSwap::new(Arc::new(service)),
81            tls_manager,
82            shutdown: GracefulShutdown::new(grace_period),
83            config_path: None,
84            metrics: Arc::new(Metrics::new()),
85            modules,
86        })
87    }
88
89    /// Hot-reload: atomically swap in a new config and rebuild the router.
90    pub async fn reload(&self, new_config: AppConfig) {
91        // Reload TLS configuration if a TlsManager is present.
92        if let Some(ref tls_mgr) = self.tls_manager
93            && let Err(e) = tls_mgr.reload(&new_config).await
94        {
95            error!("failed to reload TLS configuration: {e}");
96            // Continue with the rest of the reload; the old TLS config
97            // remains active via ArcSwap.
98        }
99
100        let new_service = salvo_service::build_service(&new_config, &self.modules);
101        self.service.store(Arc::new(new_service));
102        self.config.store(Arc::new(new_config));
103        info!("configuration reloaded");
104    }
105}
106
107/// Start the HTTP (and optionally HTTPS) listener(s) and serve requests.
108///
109/// When `state.tls_manager` is `Some`, an HTTPS listener is started on the
110/// configured `https_addr` alongside the HTTP listener. The HTTP listener
111/// doubles as the ACME HTTP-01 challenge responder.
112///
113/// When `proxy_protocol` is enabled in the global config, the PROXY protocol
114/// header is parsed from each connection before serving. The real client
115/// address from the header is used instead of the TCP peer address.
116pub async fn run(state: Arc<AppState>) -> Result<(), crate::ProxyError> {
117    let config = state.config.load();
118
119    // Start the admin API server if configured.
120    if let Some(admin_addr) = config.global.admin_addr {
121        let admin_state = Arc::clone(&state);
122        let admin_metrics = Arc::clone(&state.metrics);
123        tokio::spawn(async move {
124            if let Err(e) =
125                crate::admin::start_admin_server(admin_addr, admin_state, admin_metrics).await
126            {
127                error!("admin server error: {e}");
128            }
129        });
130    }
131
132    // Start L4 stream proxy listeners if configured.
133    if let Some(ref stream_config) = config.stream {
134        match crate::stream::start_stream_listeners(stream_config).await {
135            Ok(handles) => {
136                info!(count = handles.len(), "stream proxy listeners started");
137            }
138            Err(e) => {
139                error!("failed to start stream proxy listeners: {e}");
140            }
141        }
142    }
143
144    let http_addr = config.global.http_addr;
145    let proxy_protocol_enabled = config.global.proxy_protocol;
146
147    if proxy_protocol_enabled {
148        info!("PROXY protocol support enabled");
149    }
150
151    let http_listener = TcpListener::bind(http_addr).await?;
152    info!(%http_addr, "listening for HTTP connections");
153
154    // If TLS is configured, start the HTTPS listener concurrently.
155    let has_tls = state.tls_manager.is_some();
156
157    if has_tls {
158        let https_addr = config.global.https_addr;
159        let https_listener = TcpListener::bind(https_addr).await?;
160        info!(%https_addr, "listening for HTTPS connections");
161
162        // Start HTTP/3 (QUIC) listener if the feature is enabled and config
163        // has http3 turned on.
164        #[cfg(feature = "http3")]
165        if config.global.http3 {
166            let h3_state = Arc::clone(&state);
167            let h3_tls = state
168                .tls_manager
169                .as_ref()
170                .expect("TLS manager must exist when has_tls is true")
171                .server_config();
172            let h3_addr = https_addr;
173            tokio::spawn(async move {
174                if let Err(e) = h3_server::run_h3_server(h3_addr, h3_tls, h3_state).await {
175                    error!("HTTP/3 server error: {e}");
176                }
177            });
178        }
179
180        // Notify systemd that the service is ready (Type=notify).
181        crate::sd_notify::sd_notify("READY=1");
182
183        let http_state = Arc::clone(&state);
184        let https_state = Arc::clone(&state);
185
186        tokio::select! {
187            result = accept_http_loop(http_listener, http_state, proxy_protocol_enabled) => {
188                if let Err(e) = result {
189                    error!("HTTP listener error: {e}");
190                }
191            }
192            result = accept_https_loop(https_listener, https_state, proxy_protocol_enabled) => {
193                if let Err(e) = result {
194                    error!("HTTPS listener error: {e}");
195                }
196            }
197        }
198    } else {
199        // Notify systemd that the service is ready (Type=notify).
200        crate::sd_notify::sd_notify("READY=1");
201
202        accept_http_loop(http_listener, state, proxy_protocol_enabled).await?;
203    }
204
205    Ok(())
206}
207
208/// Accept loop for plain HTTP connections.
209async fn accept_http_loop(
210    listener: TcpListener,
211    state: Arc<AppState>,
212    proxy_protocol: bool,
213) -> Result<(), crate::ProxyError> {
214    let local_addr = listener.local_addr()?;
215    loop {
216        if state.shutdown.is_shutdown() {
217            info!("HTTP accept loop stopping (shutdown)");
218            break;
219        }
220
221        let (mut stream, peer_addr) = match listener.accept().await {
222            Ok(conn) => conn,
223            Err(e) => {
224                error!("HTTP accept error: {e}");
225                continue;
226            }
227        };
228
229        // Apply TCP socket options from global config.
230        {
231            let cfg = state.config.load();
232            if cfg.global.tcp_nodelay {
233                stream.set_nodelay(true).ok();
234            }
235        }
236
237        let state = Arc::clone(&state);
238        let _conn_guard = state.shutdown.track_conn();
239        tokio::spawn(async move {
240            let _guard = _conn_guard;
241
242            if proxy_protocol {
243                // Parse PROXY protocol header to get the real client address.
244                match proxy_protocol::parse_proxy_protocol(&mut stream).await {
245                    Ok((header, prefix)) => {
246                        let client_addr = header.as_ref().map(|h| h.src_addr).unwrap_or(peer_addr);
247                        let prefixed = PrefixedStream::new(prefix, stream);
248                        if let Err(e) =
249                            http_server::serve_io(prefixed, local_addr, client_addr, state, false)
250                                .await
251                        {
252                            debug!(client = %client_addr, "HTTP connection error: {e}");
253                        }
254                    }
255                    Err(e) => {
256                        debug!(client = %peer_addr, "PROXY protocol parse error: {e}");
257                    }
258                }
259            } else {
260                if let Err(e) =
261                    http_server::serve_connection(stream, local_addr, peer_addr, state).await
262                {
263                    debug!(client = %peer_addr, "HTTP connection error: {e}");
264                }
265            }
266        });
267    }
268
269    Ok(())
270}
271
272/// Accept loop for TLS-wrapped HTTPS connections.
273async fn accept_https_loop(
274    listener: TcpListener,
275    state: Arc<AppState>,
276    proxy_protocol: bool,
277) -> Result<(), crate::ProxyError> {
278    let local_addr = listener.local_addr()?;
279    loop {
280        if state.shutdown.is_shutdown() {
281            info!("HTTPS accept loop stopping (shutdown)");
282            break;
283        }
284
285        let (mut stream, peer_addr) = match listener.accept().await {
286            Ok(conn) => conn,
287            Err(e) => {
288                error!("HTTPS accept error: {e}");
289                continue;
290            }
291        };
292
293        // Apply TCP socket options from global config.
294        {
295            let cfg = state.config.load();
296            if cfg.global.tcp_nodelay {
297                stream.set_nodelay(true).ok();
298            }
299        }
300
301        // Get a TlsAcceptor from the TlsManager. We call acceptor() each
302        // iteration so that hot-reloaded TLS configs are picked up.
303        let acceptor = match state.tls_manager {
304            Some(ref tls_mgr) => tls_mgr.acceptor(),
305            None => {
306                // Should not happen since we only enter this loop when TLS is
307                // configured, but handle gracefully.
308                warn!("HTTPS accept loop running without TLS manager");
309                continue;
310            }
311        };
312
313        let state = Arc::clone(&state);
314        let _conn_guard = state.shutdown.track_conn();
315        tokio::spawn(async move {
316            let _guard = _conn_guard;
317
318            if proxy_protocol {
319                // Parse PROXY protocol header first, then perform TLS handshake.
320                match proxy_protocol::parse_proxy_protocol(&mut stream).await {
321                    Ok((header, prefix)) => {
322                        let client_addr = header.as_ref().map(|h| h.src_addr).unwrap_or(peer_addr);
323                        let prefixed = PrefixedStream::new(prefix, stream);
324
325                        // Perform TLS handshake on the PrefixedStream.
326                        let tls_stream = match acceptor.accept(prefixed).await {
327                            Ok(tls) => tls,
328                            Err(e) => {
329                                debug!(client = %client_addr, "TLS handshake failed: {e}");
330                                return;
331                            }
332                        };
333
334                        if let Err(e) =
335                            http_server::serve_io(tls_stream, local_addr, client_addr, state, true)
336                                .await
337                        {
338                            debug!(client = %client_addr, "HTTPS connection error: {e}");
339                        }
340                    }
341                    Err(e) => {
342                        debug!(client = %peer_addr, "PROXY protocol parse error: {e}");
343                    }
344                }
345            } else {
346                // No PROXY protocol — standard TLS handshake.
347                let tls_stream = match acceptor.accept(stream).await {
348                    Ok(tls) => tls,
349                    Err(e) => {
350                        debug!(client = %peer_addr, "TLS handshake failed: {e}");
351                        return;
352                    }
353                };
354
355                if let Err(e) =
356                    http_server::serve_tls_connection(tls_stream, local_addr, peer_addr, state)
357                        .await
358                {
359                    debug!(client = %peer_addr, "HTTPS connection error: {e}");
360                }
361            }
362        });
363    }
364
365    Ok(())
366}