1pub mod graceful;
2#[cfg(feature = "http3")]
3pub mod h3_server;
4pub mod http_server;
5pub mod proxy_protocol;
6
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10pub use graceful::GracefulShutdown;
11pub use proxy_protocol::{PrefixedStream, ProxyProtocolHeader};
12use tokio::net::TcpListener;
13use tracing::{debug, error, info, warn};
14
15use crate::config::AppConfig;
16use crate::hoops::metrics::Metrics;
17use crate::plugin::ModuleRegistry;
18use crate::salvo_service;
19use crate::tls::TlsManager;
20
21pub struct AppState {
23 pub config: ArcSwap<AppConfig>,
24 pub service: ArcSwap<salvo::Service>,
25 pub tls_manager: Option<TlsManager>,
26 pub shutdown: GracefulShutdown,
27 pub config_path: Option<String>,
29 pub metrics: Arc<Metrics>,
31 pub modules: Arc<ModuleRegistry>,
33}
34
35impl AppState {
36 pub fn new(config: AppConfig, tls_manager: Option<TlsManager>) -> Arc<Self> {
37 let grace_period = config.global.grace_period;
38 let service = salvo_service::build_service(&config, &crate::plugin::ModuleRegistry::new());
39 Arc::new(Self {
40 config: ArcSwap::from_pointee(config),
41 service: ArcSwap::new(Arc::new(service)),
42 tls_manager,
43 shutdown: GracefulShutdown::new(grace_period),
44 config_path: None,
45 metrics: Arc::new(Metrics::new()),
46 modules: Arc::new(ModuleRegistry::new()),
47 })
48 }
49
50 pub fn with_config_path(
52 config: AppConfig,
53 tls_manager: Option<TlsManager>,
54 path: String,
55 ) -> Arc<Self> {
56 let grace_period = config.global.grace_period;
57 let service = salvo_service::build_service(&config, &crate::plugin::ModuleRegistry::new());
58 Arc::new(Self {
59 config: ArcSwap::from_pointee(config),
60 service: ArcSwap::new(Arc::new(service)),
61 tls_manager,
62 shutdown: GracefulShutdown::new(grace_period),
63 config_path: Some(path),
64 metrics: Arc::new(Metrics::new()),
65 modules: Arc::new(ModuleRegistry::new()),
66 })
67 }
68
69 pub fn with_modules(
71 config: AppConfig,
72 tls_manager: Option<TlsManager>,
73 modules: ModuleRegistry,
74 ) -> Arc<Self> {
75 let grace_period = config.global.grace_period;
76 let modules = Arc::new(modules);
77 let service = salvo_service::build_service(&config, &modules);
78 Arc::new(Self {
79 config: ArcSwap::from_pointee(config),
80 service: ArcSwap::new(Arc::new(service)),
81 tls_manager,
82 shutdown: GracefulShutdown::new(grace_period),
83 config_path: None,
84 metrics: Arc::new(Metrics::new()),
85 modules,
86 })
87 }
88
89 pub async fn reload(&self, new_config: AppConfig) {
91 if let Some(ref tls_mgr) = self.tls_manager
93 && let Err(e) = tls_mgr.reload(&new_config).await
94 {
95 error!("failed to reload TLS configuration: {e}");
96 }
99
100 let new_service = salvo_service::build_service(&new_config, &self.modules);
101 self.service.store(Arc::new(new_service));
102 self.config.store(Arc::new(new_config));
103 info!("configuration reloaded");
104 }
105}
106
107pub async fn run(state: Arc<AppState>) -> Result<(), crate::ProxyError> {
117 let config = state.config.load();
118
119 if let Some(admin_addr) = config.global.admin_addr {
121 let admin_state = Arc::clone(&state);
122 let admin_metrics = Arc::clone(&state.metrics);
123 tokio::spawn(async move {
124 if let Err(e) =
125 crate::admin::start_admin_server(admin_addr, admin_state, admin_metrics).await
126 {
127 error!("admin server error: {e}");
128 }
129 });
130 }
131
132 if let Some(ref stream_config) = config.stream {
134 match crate::stream::start_stream_listeners(stream_config).await {
135 Ok(handles) => {
136 info!(count = handles.len(), "stream proxy listeners started");
137 }
138 Err(e) => {
139 error!("failed to start stream proxy listeners: {e}");
140 }
141 }
142 }
143
144 let http_addr = config.global.http_addr;
145 let proxy_protocol_enabled = config.global.proxy_protocol;
146
147 if proxy_protocol_enabled {
148 info!("PROXY protocol support enabled");
149 }
150
151 let http_listener = TcpListener::bind(http_addr).await?;
152 info!(%http_addr, "listening for HTTP connections");
153
154 let has_tls = state.tls_manager.is_some();
156
157 if has_tls {
158 let https_addr = config.global.https_addr;
159 let https_listener = TcpListener::bind(https_addr).await?;
160 info!(%https_addr, "listening for HTTPS connections");
161
162 #[cfg(feature = "http3")]
165 if config.global.http3 {
166 let h3_state = Arc::clone(&state);
167 let h3_tls = state
168 .tls_manager
169 .as_ref()
170 .expect("TLS manager must exist when has_tls is true")
171 .server_config();
172 let h3_addr = https_addr;
173 tokio::spawn(async move {
174 if let Err(e) = h3_server::run_h3_server(h3_addr, h3_tls, h3_state).await {
175 error!("HTTP/3 server error: {e}");
176 }
177 });
178 }
179
180 crate::sd_notify::sd_notify("READY=1");
182
183 let http_state = Arc::clone(&state);
184 let https_state = Arc::clone(&state);
185
186 tokio::select! {
187 result = accept_http_loop(http_listener, http_state, proxy_protocol_enabled) => {
188 if let Err(e) = result {
189 error!("HTTP listener error: {e}");
190 }
191 }
192 result = accept_https_loop(https_listener, https_state, proxy_protocol_enabled) => {
193 if let Err(e) = result {
194 error!("HTTPS listener error: {e}");
195 }
196 }
197 }
198 } else {
199 crate::sd_notify::sd_notify("READY=1");
201
202 accept_http_loop(http_listener, state, proxy_protocol_enabled).await?;
203 }
204
205 Ok(())
206}
207
208async fn accept_http_loop(
210 listener: TcpListener,
211 state: Arc<AppState>,
212 proxy_protocol: bool,
213) -> Result<(), crate::ProxyError> {
214 let local_addr = listener.local_addr()?;
215 loop {
216 if state.shutdown.is_shutdown() {
217 info!("HTTP accept loop stopping (shutdown)");
218 break;
219 }
220
221 let (mut stream, peer_addr) = match listener.accept().await {
222 Ok(conn) => conn,
223 Err(e) => {
224 error!("HTTP accept error: {e}");
225 continue;
226 }
227 };
228
229 {
231 let cfg = state.config.load();
232 if cfg.global.tcp_nodelay {
233 stream.set_nodelay(true).ok();
234 }
235 }
236
237 let state = Arc::clone(&state);
238 let _conn_guard = state.shutdown.track_conn();
239 tokio::spawn(async move {
240 let _guard = _conn_guard;
241
242 if proxy_protocol {
243 match proxy_protocol::parse_proxy_protocol(&mut stream).await {
245 Ok((header, prefix)) => {
246 let client_addr = header.as_ref().map(|h| h.src_addr).unwrap_or(peer_addr);
247 let prefixed = PrefixedStream::new(prefix, stream);
248 if let Err(e) =
249 http_server::serve_io(prefixed, local_addr, client_addr, state, false)
250 .await
251 {
252 debug!(client = %client_addr, "HTTP connection error: {e}");
253 }
254 }
255 Err(e) => {
256 debug!(client = %peer_addr, "PROXY protocol parse error: {e}");
257 }
258 }
259 } else {
260 if let Err(e) =
261 http_server::serve_connection(stream, local_addr, peer_addr, state).await
262 {
263 debug!(client = %peer_addr, "HTTP connection error: {e}");
264 }
265 }
266 });
267 }
268
269 Ok(())
270}
271
272async fn accept_https_loop(
274 listener: TcpListener,
275 state: Arc<AppState>,
276 proxy_protocol: bool,
277) -> Result<(), crate::ProxyError> {
278 let local_addr = listener.local_addr()?;
279 loop {
280 if state.shutdown.is_shutdown() {
281 info!("HTTPS accept loop stopping (shutdown)");
282 break;
283 }
284
285 let (mut stream, peer_addr) = match listener.accept().await {
286 Ok(conn) => conn,
287 Err(e) => {
288 error!("HTTPS accept error: {e}");
289 continue;
290 }
291 };
292
293 {
295 let cfg = state.config.load();
296 if cfg.global.tcp_nodelay {
297 stream.set_nodelay(true).ok();
298 }
299 }
300
301 let acceptor = match state.tls_manager {
304 Some(ref tls_mgr) => tls_mgr.acceptor(),
305 None => {
306 warn!("HTTPS accept loop running without TLS manager");
309 continue;
310 }
311 };
312
313 let state = Arc::clone(&state);
314 let _conn_guard = state.shutdown.track_conn();
315 tokio::spawn(async move {
316 let _guard = _conn_guard;
317
318 if proxy_protocol {
319 match proxy_protocol::parse_proxy_protocol(&mut stream).await {
321 Ok((header, prefix)) => {
322 let client_addr = header.as_ref().map(|h| h.src_addr).unwrap_or(peer_addr);
323 let prefixed = PrefixedStream::new(prefix, stream);
324
325 let tls_stream = match acceptor.accept(prefixed).await {
327 Ok(tls) => tls,
328 Err(e) => {
329 debug!(client = %client_addr, "TLS handshake failed: {e}");
330 return;
331 }
332 };
333
334 if let Err(e) =
335 http_server::serve_io(tls_stream, local_addr, client_addr, state, true)
336 .await
337 {
338 debug!(client = %client_addr, "HTTPS connection error: {e}");
339 }
340 }
341 Err(e) => {
342 debug!(client = %peer_addr, "PROXY protocol parse error: {e}");
343 }
344 }
345 } else {
346 let tls_stream = match acceptor.accept(stream).await {
348 Ok(tls) => tls,
349 Err(e) => {
350 debug!(client = %peer_addr, "TLS handshake failed: {e}");
351 return;
352 }
353 };
354
355 if let Err(e) =
356 http_server::serve_tls_connection(tls_stream, local_addr, peer_addr, state)
357 .await
358 {
359 debug!(client = %peer_addr, "HTTPS connection error: {e}");
360 }
361 }
362 });
363 }
364
365 Ok(())
366}