Skip to main content

zlayer_proxy/
server.rs

1//! HTTP server implementation
2//!
3//! This module provides the HTTP/HTTPS server for the proxy.
4//! Uses `ServiceRegistry` for route resolution instead of the legacy `Router`.
5
6use crate::acme::CertManager;
7use crate::config::ProxyConfig;
8use crate::error::{ProxyError, Result};
9use crate::lb::LoadBalancer;
10use crate::network_policy::NetworkPolicyChecker;
11use crate::routes::ServiceRegistry;
12use crate::service::ReverseProxyService;
13use crate::sni_resolver::SniCertResolver;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper::Request;
18use hyper_util::rt::TokioIo;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::TcpListener;
23use tokio::sync::watch;
24use tokio_rustls::TlsAcceptor;
25use tracing::{debug, error, info, warn};
26
27/// Initial backoff between failed ingress bind attempts.
28const INGRESS_BIND_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
29
30/// Maximum backoff between failed ingress bind attempts. The backoff doubles
31/// from [`INGRESS_BIND_BACKOFF_INITIAL`] up to this cap, then stays there.
32const INGRESS_BIND_BACKOFF_MAX: Duration = Duration::from_secs(30);
33
34/// After the first warning, only warn again every N attempts so a long-held
35/// port (e.g. another process owning :80) does not spam the log every backoff
36/// tick. With the cap above this works out to roughly one warning per minute
37/// in steady state.
38const INGRESS_BIND_WARN_EVERY: u64 = 30;
39
40/// Compute the next backoff for the ingress bind-retry loop: double the current
41/// delay, capped at [`INGRESS_BIND_BACKOFF_MAX`].
42///
43/// Pulled out as a pure function so the backoff schedule can be unit-tested
44/// without binding real sockets.
45#[must_use]
46fn next_ingress_backoff(current: Duration) -> Duration {
47    (current * 2).min(INGRESS_BIND_BACKOFF_MAX)
48}
49
50/// Decide whether the ingress bind-retry loop should emit a warning on this
51/// attempt. Always warns on the first attempt (`attempt == 0`), then only once
52/// every [`INGRESS_BIND_WARN_EVERY`] attempts thereafter.
53///
54/// Pure function so the warn cadence is unit-testable.
55#[must_use]
56fn should_warn_on_attempt(attempt: u64) -> bool {
57    attempt == 0 || attempt % INGRESS_BIND_WARN_EVERY == 0
58}
59
60/// Bind a [`TcpListener`] to `addr`, retrying FOREVER on failure.
61///
62/// This is the ingress bind path: a bind failure is NEVER fatal. On each
63/// failure it logs (warning cadence per [`should_warn_on_attempt`]), sleeps a
64/// capped exponential backoff, and tries again. It only returns once the bind
65/// succeeds (or `shutdown_rx` flips to `true`, in which case it returns
66/// `None`).
67///
68/// `EACCES` (permission denied) gets a dedicated one-time hint, since binding
69/// 80/443 needs root or `CAP_NET_BIND_SERVICE`.
70async fn bind_with_retry(
71    addr: SocketAddr,
72    label: &str,
73    shutdown_rx: &mut watch::Receiver<bool>,
74) -> Option<TcpListener> {
75    let mut attempt: u64 = 0;
76    let mut backoff = INGRESS_BIND_BACKOFF_INITIAL;
77    let mut warned_eacces = false;
78
79    loop {
80        if *shutdown_rx.borrow() {
81            return None;
82        }
83
84        match TcpListener::bind(addr).await {
85            Ok(listener) => {
86                if attempt > 0 {
87                    info!(addr = %addr, label, attempt, "Ingress bound after retrying");
88                }
89                return Some(listener);
90            }
91            Err(e) => {
92                let is_eacces = e.kind() == std::io::ErrorKind::PermissionDenied;
93                if is_eacces && !warned_eacces {
94                    warned_eacces = true;
95                    warn!(
96                        addr = %addr,
97                        label,
98                        error = %e,
99                        "Ingress bind denied: binding 80/443 needs root or CAP_NET_BIND_SERVICE; \
100                         will keep retrying without aborting startup"
101                    );
102                } else if should_warn_on_attempt(attempt) {
103                    warn!(
104                        addr = %addr,
105                        label,
106                        attempt,
107                        error = %e,
108                        "Ingress bind failed; will keep retrying (port may be held by another process)"
109                    );
110                } else {
111                    debug!(addr = %addr, label, attempt, error = %e, "Ingress bind retry");
112                }
113
114                tokio::select! {
115                    () = tokio::time::sleep(backoff) => {}
116                    _ = shutdown_rx.changed() => {
117                        if *shutdown_rx.borrow() {
118                            return None;
119                        }
120                    }
121                }
122
123                attempt = attempt.saturating_add(1);
124                backoff = next_ingress_backoff(backoff);
125            }
126        }
127    }
128}
129
130/// The proxy server
131pub struct ProxyServer {
132    /// Server configuration
133    config: Arc<ProxyConfig>,
134    /// Service registry for route resolution
135    registry: Arc<ServiceRegistry>,
136    /// Load balancer for backend selection
137    load_balancer: Arc<LoadBalancer>,
138    /// Shutdown signal sender
139    shutdown_tx: watch::Sender<bool>,
140    /// Shutdown signal receiver
141    shutdown_rx: watch::Receiver<bool>,
142    /// TLS acceptor for HTTPS connections
143    tls_acceptor: Option<TlsAcceptor>,
144    /// Certificate manager for ACME challenge responses
145    cert_manager: Option<Arc<CertManager>>,
146    /// Optional network policy checker for access control enforcement
147    network_policy_checker: Option<NetworkPolicyChecker>,
148}
149
150impl ProxyServer {
151    /// Create a new proxy server
152    pub fn new(
153        config: ProxyConfig,
154        registry: Arc<ServiceRegistry>,
155        load_balancer: Arc<LoadBalancer>,
156    ) -> Self {
157        let (shutdown_tx, shutdown_rx) = watch::channel(false);
158
159        Self {
160            config: Arc::new(config),
161            registry,
162            load_balancer,
163            shutdown_tx,
164            shutdown_rx,
165            tls_acceptor: None,
166            cert_manager: None,
167            network_policy_checker: None,
168        }
169    }
170
171    /// Create a proxy server with an existing registry (alias for `new`)
172    pub fn with_registry(
173        config: ProxyConfig,
174        registry: Arc<ServiceRegistry>,
175        load_balancer: Arc<LoadBalancer>,
176    ) -> Self {
177        Self::new(config, registry, load_balancer)
178    }
179
180    /// Create a proxy server with TLS via SNI resolver
181    pub fn with_tls_resolver(
182        config: ProxyConfig,
183        registry: Arc<ServiceRegistry>,
184        load_balancer: Arc<LoadBalancer>,
185        resolver: Arc<SniCertResolver>,
186    ) -> Self {
187        let tls_config = rustls::ServerConfig::builder()
188            .with_no_client_auth()
189            .with_cert_resolver(resolver);
190        let acceptor = TlsAcceptor::from(Arc::new(tls_config));
191        let (shutdown_tx, shutdown_rx) = watch::channel(false);
192
193        Self {
194            config: Arc::new(config),
195            registry,
196            load_balancer,
197            shutdown_tx,
198            shutdown_rx,
199            tls_acceptor: Some(acceptor),
200            cert_manager: None,
201            network_policy_checker: None,
202        }
203    }
204
205    /// Set the certificate manager for ACME challenge interception
206    #[must_use]
207    pub fn with_cert_manager(mut self, cm: Arc<CertManager>) -> Self {
208        self.cert_manager = Some(cm);
209        self
210    }
211
212    /// Set the network policy checker for access control enforcement
213    #[must_use]
214    pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
215        self.network_policy_checker = Some(checker);
216        self
217    }
218
219    /// Check if TLS is enabled
220    #[must_use]
221    pub fn has_tls(&self) -> bool {
222        self.tls_acceptor.is_some()
223    }
224
225    /// Get the TLS acceptor if configured
226    #[must_use]
227    pub fn tls_acceptor(&self) -> Option<&TlsAcceptor> {
228        self.tls_acceptor.as_ref()
229    }
230
231    /// Get the service registry
232    #[must_use]
233    pub fn registry(&self) -> Arc<ServiceRegistry> {
234        self.registry.clone()
235    }
236
237    /// Get the configuration
238    #[must_use]
239    pub fn config(&self) -> Arc<ProxyConfig> {
240        self.config.clone()
241    }
242
243    /// Signal the server to shut down
244    pub fn shutdown(&self) {
245        let _ = self.shutdown_tx.send(true);
246    }
247
248    /// Run the HTTP server
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if binding to the configured HTTP address fails
253    /// or if the accept loop encounters a fatal error.
254    pub async fn run(&self) -> Result<()> {
255        let addr = self.config.server.http_addr;
256        let listener = TcpListener::bind(addr)
257            .await
258            .map_err(|e| ProxyError::BindFailed {
259                addr,
260                reason: e.to_string(),
261            })?;
262
263        info!(addr = %addr, "HTTP proxy server listening");
264
265        self.accept_loop(listener).await
266    }
267
268    /// Run the server on a specific address
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if binding to the given address fails
273    /// or if the accept loop encounters a fatal error.
274    pub async fn run_on(&self, addr: SocketAddr) -> Result<()> {
275        let listener = TcpListener::bind(addr)
276            .await
277            .map_err(|e| ProxyError::BindFailed {
278                addr,
279                reason: e.to_string(),
280            })?;
281
282        info!(addr = %addr, "HTTP proxy server listening");
283
284        self.accept_loop(listener).await
285    }
286
287    /// Run the HTTP ingress server on a specific address, retrying the bind
288    /// FOREVER on failure instead of returning a fatal error.
289    ///
290    /// This is the ingress entry point: if `addr` (typically `0.0.0.0:80`) is
291    /// already held, it logs a warning and keeps retrying the bind with a
292    /// capped exponential backoff until the port frees, then proceeds to the
293    /// normal accept loop. It NEVER returns [`ProxyError::BindFailed`].
294    ///
295    /// # Errors
296    ///
297    /// Returns an error only if the accept loop itself encounters a fatal
298    /// error after a successful bind. Bind failures are non-fatal.
299    pub async fn run_with_retry(&self, addr: SocketAddr) -> Result<()> {
300        let mut shutdown_rx = self.shutdown_rx.clone();
301        let Some(listener) = bind_with_retry(addr, "http", &mut shutdown_rx).await else {
302            // Shutdown requested before we ever bound.
303            return Ok(());
304        };
305
306        info!(addr = %addr, "HTTP ingress server listening");
307
308        self.accept_loop(listener).await
309    }
310
311    async fn accept_loop(&self, listener: TcpListener) -> Result<()> {
312        let mut shutdown_rx = self.shutdown_rx.clone();
313
314        loop {
315            tokio::select! {
316                // Check for shutdown signal
317                _ = shutdown_rx.changed() => {
318                    if *shutdown_rx.borrow() {
319                        info!("Shutting down proxy server");
320                        break;
321                    }
322                }
323
324                // Accept new connections
325                result = listener.accept() => {
326                    match result {
327                        Ok((stream, remote_addr)) => {
328                            let registry = self.registry.clone();
329                            let load_balancer = self.load_balancer.clone();
330                            let config = self.config.clone();
331                            let cert_manager = self.cert_manager.clone();
332                            let npc = self.network_policy_checker.clone();
333
334                            tokio::spawn(async move {
335                                if let Err(e) = Self::handle_connection(
336                                    stream,
337                                    remote_addr,
338                                    registry,
339                                    load_balancer,
340                                    config,
341                                    cert_manager,
342                                    npc,
343                                ).await {
344                                    debug!(
345                                        error = %e,
346                                        remote_addr = %remote_addr,
347                                        "Connection error"
348                                    );
349                                }
350                            });
351                        }
352                        Err(e) => {
353                            warn!(error = %e, "Failed to accept connection");
354                        }
355                    }
356                }
357            }
358        }
359
360        Ok(())
361    }
362
363    #[allow(clippy::too_many_arguments)]
364    async fn handle_connection(
365        stream: tokio::net::TcpStream,
366        remote_addr: SocketAddr,
367        registry: Arc<ServiceRegistry>,
368        load_balancer: Arc<LoadBalancer>,
369        config: Arc<ProxyConfig>,
370        cert_manager: Option<Arc<CertManager>>,
371        network_policy_checker: Option<NetworkPolicyChecker>,
372    ) -> Result<()> {
373        let io = TokioIo::new(stream);
374
375        let mut service =
376            ReverseProxyService::new(registry, load_balancer, config).with_remote_addr(remote_addr);
377        if let Some(cm) = cert_manager {
378            service = service.with_cert_manager(cm);
379        }
380        if let Some(checker) = network_policy_checker {
381            service = service.with_network_policy_checker(checker);
382        }
383
384        let service = service_fn(move |req: Request<Incoming>| {
385            let svc = service.clone();
386            async move {
387                match svc.proxy_request(req).await {
388                    Ok(response) => Ok::<_, hyper::Error>(response),
389                    Err(e) => {
390                        error!(error = %e, "Proxy error");
391                        Ok(ReverseProxyService::error_response(&e))
392                    }
393                }
394            }
395        });
396
397        http1::Builder::new()
398            .preserve_header_case(true)
399            .title_case_headers(false)
400            .serve_connection(io, service)
401            .with_upgrades()
402            .await
403            .map_err(ProxyError::Hyper)?;
404
405        Ok(())
406    }
407
408    /// Run the HTTPS server
409    ///
410    /// This requires TLS to be configured when creating the `ProxyServer`.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if TLS is not configured, if binding to the
415    /// configured HTTPS address fails, or if the accept loop encounters a fatal error.
416    pub async fn run_https(&self) -> Result<()> {
417        let acceptor = self
418            .tls_acceptor
419            .as_ref()
420            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
421
422        let addr = self.config.server.https_addr;
423        let listener = TcpListener::bind(addr)
424            .await
425            .map_err(|e| ProxyError::BindFailed {
426                addr,
427                reason: e.to_string(),
428            })?;
429
430        info!(addr = %addr, "HTTPS proxy server listening");
431
432        self.accept_loop_tls(listener, acceptor.clone()).await
433    }
434
435    /// Run the HTTPS server on a specific address
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if TLS is not configured, if binding to the
440    /// given address fails, or if the accept loop encounters a fatal error.
441    pub async fn run_https_on(&self, addr: SocketAddr) -> Result<()> {
442        let acceptor = self
443            .tls_acceptor
444            .as_ref()
445            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
446
447        let listener = TcpListener::bind(addr)
448            .await
449            .map_err(|e| ProxyError::BindFailed {
450                addr,
451                reason: e.to_string(),
452            })?;
453
454        info!(addr = %addr, "HTTPS proxy server listening");
455
456        self.accept_loop_tls(listener, acceptor.clone()).await
457    }
458
459    /// Run the HTTPS ingress server on a specific address, retrying the bind
460    /// FOREVER on failure instead of returning a fatal error.
461    ///
462    /// This is the TLS ingress entry point: if `addr` (typically
463    /// `0.0.0.0:443`) is already held, it logs a warning and keeps retrying
464    /// the bind with a capped exponential backoff until the port frees, then
465    /// proceeds to the normal TLS accept loop. It NEVER returns
466    /// [`ProxyError::BindFailed`].
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if TLS is not configured, or if the accept loop itself
471    /// encounters a fatal error after a successful bind. Bind failures are
472    /// non-fatal.
473    pub async fn run_https_with_retry(&self, addr: SocketAddr) -> Result<()> {
474        let acceptor = self
475            .tls_acceptor
476            .as_ref()
477            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
478
479        let mut shutdown_rx = self.shutdown_rx.clone();
480        let Some(listener) = bind_with_retry(addr, "https", &mut shutdown_rx).await else {
481            // Shutdown requested before we ever bound.
482            return Ok(());
483        };
484
485        info!(addr = %addr, "HTTPS ingress server listening");
486
487        self.accept_loop_tls(listener, acceptor.clone()).await
488    }
489
490    /// Run both HTTP and HTTPS servers concurrently
491    ///
492    /// This requires TLS to be configured when creating the `ProxyServer`.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if TLS is not configured, if binding to either
497    /// the HTTP or HTTPS address fails, or if either accept loop encounters
498    /// a fatal error.
499    #[allow(clippy::similar_names)]
500    pub async fn run_both(&self) -> Result<()> {
501        let http_addr = self.config.server.http_addr;
502        let https_addr = self.config.server.https_addr;
503
504        let acceptor = self
505            .tls_acceptor
506            .as_ref()
507            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
508
509        let http_listener =
510            TcpListener::bind(http_addr)
511                .await
512                .map_err(|e| ProxyError::BindFailed {
513                    addr: http_addr,
514                    reason: e.to_string(),
515                })?;
516
517        let https_listener =
518            TcpListener::bind(https_addr)
519                .await
520                .map_err(|e| ProxyError::BindFailed {
521                    addr: https_addr,
522                    reason: e.to_string(),
523                })?;
524
525        info!(http = %http_addr, https = %https_addr, "Proxy server listening");
526
527        // Run both accept loops concurrently
528        let http_future = self.accept_loop(http_listener);
529        let https_future = self.accept_loop_tls(https_listener, acceptor.clone());
530
531        tokio::select! {
532            result = http_future => result,
533            result = https_future => result,
534        }
535    }
536
537    async fn accept_loop_tls(&self, listener: TcpListener, acceptor: TlsAcceptor) -> Result<()> {
538        let mut shutdown_rx = self.shutdown_rx.clone();
539
540        loop {
541            tokio::select! {
542                // Check for shutdown signal
543                _ = shutdown_rx.changed() => {
544                    if *shutdown_rx.borrow() {
545                        info!("Shutting down HTTPS proxy server");
546                        break;
547                    }
548                }
549
550                // Accept new connections
551                result = listener.accept() => {
552                    match result {
553                        Ok((stream, remote_addr)) => {
554                            let registry = self.registry.clone();
555                            let load_balancer = self.load_balancer.clone();
556                            let config = self.config.clone();
557                            let acceptor = acceptor.clone();
558                            let cert_manager = self.cert_manager.clone();
559                            let npc = self.network_policy_checker.clone();
560
561                            tokio::spawn(async move {
562                                if let Err(e) = Self::handle_tls_connection(
563                                    stream,
564                                    remote_addr,
565                                    registry,
566                                    load_balancer,
567                                    config,
568                                    acceptor,
569                                    cert_manager,
570                                    npc,
571                                ).await {
572                                    debug!(
573                                        error = %e,
574                                        remote_addr = %remote_addr,
575                                        "TLS connection error"
576                                    );
577                                }
578                            });
579                        }
580                        Err(e) => {
581                            warn!(error = %e, "Failed to accept TLS connection");
582                        }
583                    }
584                }
585            }
586        }
587
588        Ok(())
589    }
590
591    #[allow(clippy::too_many_arguments)]
592    async fn handle_tls_connection(
593        stream: tokio::net::TcpStream,
594        remote_addr: SocketAddr,
595        registry: Arc<ServiceRegistry>,
596        load_balancer: Arc<LoadBalancer>,
597        config: Arc<ProxyConfig>,
598        acceptor: TlsAcceptor,
599        cert_manager: Option<Arc<CertManager>>,
600        network_policy_checker: Option<NetworkPolicyChecker>,
601    ) -> Result<()> {
602        // Perform TLS handshake
603        let tls_stream = acceptor
604            .accept(stream)
605            .await
606            .map_err(|e| ProxyError::Tls(format!("TLS handshake failed: {e}")))?;
607
608        let io = TokioIo::new(tls_stream);
609
610        let mut service = ReverseProxyService::new(registry, load_balancer, config)
611            .with_remote_addr(remote_addr)
612            .with_tls(true);
613        if let Some(cm) = cert_manager {
614            service = service.with_cert_manager(cm);
615        }
616        if let Some(checker) = network_policy_checker {
617            service = service.with_network_policy_checker(checker);
618        }
619
620        let service = service_fn(move |req: Request<Incoming>| {
621            let svc = service.clone();
622            async move {
623                match svc.proxy_request(req).await {
624                    Ok(response) => Ok::<_, hyper::Error>(response),
625                    Err(e) => {
626                        error!(error = %e, "Proxy error");
627                        Ok(ReverseProxyService::error_response(&e))
628                    }
629                }
630            }
631        });
632
633        http1::Builder::new()
634            .preserve_header_case(true)
635            .title_case_headers(false)
636            .serve_connection(io, service)
637            .with_upgrades()
638            .await
639            .map_err(ProxyError::Hyper)?;
640
641        Ok(())
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::lb::LoadBalancer;
649    use crate::routes::{ResolvedService, RouteEntry};
650    use zlayer_spec::{ExposeType, Protocol};
651
652    /// Helper to build a minimal `RouteEntry` for tests.
653    fn make_entry(
654        service: &str,
655        host: Option<&str>,
656        path: &str,
657        backends: Vec<SocketAddr>,
658    ) -> RouteEntry {
659        RouteEntry {
660            service_name: service.to_string(),
661            endpoint_name: "http".to_string(),
662            host: host.map(std::string::ToString::to_string),
663            path_prefix: path.to_string(),
664            resolved: ResolvedService {
665                name: service.to_string(),
666                backends,
667                use_tls: false,
668                sni_hostname: String::new(),
669                expose: ExposeType::Public,
670                protocol: Protocol::Http,
671                strip_prefix: false,
672                path_prefix: path.to_string(),
673                target_port: 8080,
674            },
675        }
676    }
677
678    use tokio::io::{AsyncReadExt, AsyncWriteExt};
679
680    /// Drive a single raw HTTP/1.1 request through `handle_connection` and
681    /// return the raw response bytes (status line + headers + body).
682    ///
683    /// This exercises the FULL ingress path — route resolution, the
684    /// default-deny on an unmatched route, the ACME carve-out, and the
685    /// generic `error_response` body — exactly as a real client would hit it,
686    /// without binding a privileged port.
687    async fn roundtrip(
688        registry: Arc<ServiceRegistry>,
689        load_balancer: Arc<LoadBalancer>,
690        cert_manager: Option<Arc<CertManager>>,
691        raw_request: &str,
692    ) -> String {
693        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
694        let addr = listener.local_addr().unwrap();
695
696        let server = tokio::spawn(async move {
697            let (stream, remote_addr) = listener.accept().await.unwrap();
698            let _ = ProxyServer::handle_connection(
699                stream,
700                remote_addr,
701                registry,
702                load_balancer,
703                Arc::new(ProxyConfig::default()),
704                cert_manager,
705                None,
706            )
707            .await;
708        });
709
710        let mut client = tokio::net::TcpStream::connect(addr).await.unwrap();
711        client.write_all(raw_request.as_bytes()).await.unwrap();
712        client.flush().await.unwrap();
713
714        let mut buf = Vec::new();
715        // Read until the peer closes (Connection: close ensures EOF).
716        let _ = client.read_to_end(&mut buf).await;
717        server.abort();
718
719        String::from_utf8_lossy(&buf).into_owned()
720    }
721
722    /// An unregistered Host (no matching route) must get a clean 404 with a
723    /// generic body — NOT forwarded anywhere, and NOT echoing the requested
724    /// Host/path. This is the default-deny boundary for the open ingress.
725    #[tokio::test]
726    async fn test_unmatched_host_denied_404_generic_body() {
727        let registry = Arc::new(ServiceRegistry::new());
728        // Register a route for ONE host so the registry is non-empty; the
729        // request below targets a DIFFERENT host that matches nothing.
730        registry
731            .register(make_entry(
732                "known",
733                Some("known.example.com"),
734                "/",
735                vec!["127.0.0.1:9".parse().unwrap()],
736            ))
737            .await;
738        let lb = Arc::new(LoadBalancer::new());
739
740        let resp = roundtrip(
741            registry,
742            lb,
743            None,
744            "GET /secret/path HTTP/1.1\r\nHost: attacker.unregistered.test\r\nConnection: close\r\n\r\n",
745        )
746        .await;
747
748        assert!(
749            resp.starts_with("HTTP/1.1 404"),
750            "unmatched host must be denied with 404, got: {resp}"
751        );
752        // The generic body must not leak the requested host or path.
753        assert!(
754            !resp.contains("attacker.unregistered.test"),
755            "response must not echo the requested host: {resp}"
756        );
757        assert!(
758            !resp.contains("/secret/path"),
759            "response must not echo the requested path: {resp}"
760        );
761        assert!(
762            resp.contains("404 Not Found"),
763            "response should carry the generic 404 body: {resp}"
764        );
765    }
766
767    /// The ACME HTTP-01 challenge path must still be served (this is how certs
768    /// get issued) — the default-deny must come AFTER the ACME carve-out.
769    #[tokio::test]
770    async fn test_acme_challenge_served_not_denied() {
771        let tmp = tempfile::tempdir().unwrap();
772        let cm = Arc::new(
773            CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
774                .await
775                .unwrap(),
776        );
777        let token = "test-token-abc";
778        cm.store_challenge(token, "example.com", "key-auth-payload-123");
779
780        // Empty registry: nothing is registered, so if the ACME carve-out
781        // were missing this would be denied by default.
782        let registry = Arc::new(ServiceRegistry::new());
783        let lb = Arc::new(LoadBalancer::new());
784
785        let resp = roundtrip(
786            registry,
787            lb,
788            Some(cm),
789            &format!(
790                "GET /.well-known/acme-challenge/{token} HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
791            ),
792        )
793        .await;
794
795        assert!(
796            resp.starts_with("HTTP/1.1 200"),
797            "ACME challenge must be served with 200, got: {resp}"
798        );
799        assert!(
800            resp.contains("key-auth-payload-123"),
801            "ACME challenge must return the key authorization: {resp}"
802        );
803    }
804
805    /// A matched route whose backend group has zero healthy backends must
806    /// return 503 with a generic body — NOT leaking the internal LB group
807    /// name.
808    #[tokio::test]
809    async fn test_matched_no_backends_503_generic_body() {
810        let registry = Arc::new(ServiceRegistry::new());
811        // Route matches, but its resolved LB group name (carried in the body
812        // of the old leaky error) must never reach the client.
813        let lb_group = "prod/api#http-secret-group";
814        let mut entry = make_entry("api", Some("api.example.com"), "/", vec![]);
815        entry.resolved.name = lb_group.to_string();
816        registry.register(entry).await;
817
818        // Load balancer has NO group registered for this name → select()
819        // returns None → NoHealthyBackends.
820        let lb = Arc::new(LoadBalancer::new());
821
822        let resp = roundtrip(
823            registry,
824            lb,
825            None,
826            "GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n",
827        )
828        .await;
829
830        assert!(
831            resp.starts_with("HTTP/1.1 503"),
832            "matched route with no healthy backends must return 503, got: {resp}"
833        );
834        assert!(
835            !resp.contains(lb_group),
836            "503 body must not leak the internal LB group name: {resp}"
837        );
838        assert!(
839            resp.contains("503 Service Unavailable"),
840            "response should carry the generic 503 body: {resp}"
841        );
842    }
843
844    #[tokio::test]
845    async fn test_server_shutdown() {
846        let registry = Arc::new(ServiceRegistry::new());
847        let lb = Arc::new(LoadBalancer::new());
848        let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
849
850        // Create a separate handle for shutdown
851        let shutdown_tx = server.shutdown_tx.clone();
852
853        // Signal shutdown immediately
854        let _ = shutdown_tx.send(true);
855
856        // Server should exit gracefully
857        // (In a real test, we'd spawn the server and verify it stops)
858    }
859
860    #[tokio::test]
861    async fn test_registry_integration() {
862        let registry = Arc::new(ServiceRegistry::new());
863
864        // Add a route
865        registry
866            .register(make_entry(
867                "test-service",
868                None,
869                "/api",
870                vec!["127.0.0.1:8081".parse().unwrap()],
871            ))
872            .await;
873
874        let lb = Arc::new(LoadBalancer::new());
875        let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
876
877        // Verify registry is accessible
878        let reg = server.registry();
879        assert_eq!(reg.route_count().await, 1);
880    }
881
882    #[test]
883    fn test_next_ingress_backoff_doubles_then_caps() {
884        // Doubles from the initial value.
885        assert_eq!(
886            next_ingress_backoff(INGRESS_BIND_BACKOFF_INITIAL),
887            INGRESS_BIND_BACKOFF_INITIAL * 2
888        );
889        // Keeps doubling until it reaches the cap.
890        let mut d = INGRESS_BIND_BACKOFF_INITIAL;
891        for _ in 0..20 {
892            d = next_ingress_backoff(d);
893        }
894        assert_eq!(d, INGRESS_BIND_BACKOFF_MAX);
895        // Never exceeds the cap once there.
896        assert_eq!(
897            next_ingress_backoff(INGRESS_BIND_BACKOFF_MAX),
898            INGRESS_BIND_BACKOFF_MAX
899        );
900    }
901
902    #[test]
903    fn test_should_warn_cadence() {
904        // Always warns on the first attempt.
905        assert!(should_warn_on_attempt(0));
906        // Quiet in between.
907        assert!(!should_warn_on_attempt(1));
908        assert!(!should_warn_on_attempt(INGRESS_BIND_WARN_EVERY - 1));
909        // Warns again every N attempts.
910        assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY));
911        assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY * 2));
912    }
913
914    #[tokio::test]
915    async fn test_bind_with_retry_succeeds_after_initial_conflict() {
916        // Hold an ephemeral port, then spawn a retrying bind against it; once
917        // we drop our listener the retry loop must grab the port and return.
918        let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
919        let addr = held.local_addr().unwrap();
920
921        let (_tx, mut rx) = watch::channel(false);
922        let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
923
924        // Give the retry loop a chance to observe the conflict at least once.
925        tokio::time::sleep(Duration::from_millis(50)).await;
926        // Free the port; the retry loop should now bind it.
927        drop(held);
928
929        let bound = tokio::time::timeout(Duration::from_secs(10), handle)
930            .await
931            .expect("bind_with_retry did not finish")
932            .expect("task panicked");
933        let listener = bound.expect("expected a bound listener, got None");
934        assert_eq!(listener.local_addr().unwrap().port(), addr.port());
935    }
936
937    #[tokio::test]
938    async fn test_bind_with_retry_returns_none_on_shutdown() {
939        // A port held for the whole test: the retry loop can never bind, so a
940        // shutdown signal must unblock it and yield None (never errors).
941        let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
942        let addr = held.local_addr().unwrap();
943
944        let (tx, mut rx) = watch::channel(false);
945        let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
946
947        tokio::time::sleep(Duration::from_millis(50)).await;
948        tx.send(true).unwrap();
949
950        let result = tokio::time::timeout(Duration::from_secs(10), handle)
951            .await
952            .expect("bind_with_retry did not respond to shutdown")
953            .expect("task panicked");
954        assert!(result.is_none(), "shutdown should yield None");
955    }
956}