Skip to main content

zlayer_proxy/
server.rs

1//! HTTP server implementation
2//!
3//! This module provides the HTTP/HTTPS server for the proxy.
4//! Uses `ServiceRegistry` for route resolution instead of the legacy `Router`.
5
6use crate::acme::CertManager;
7use crate::config::ProxyConfig;
8use crate::error::{ProxyError, Result};
9use crate::lb::LoadBalancer;
10use crate::network_policy::NetworkPolicyChecker;
11use crate::routes::ServiceRegistry;
12use crate::service::{Activator, ReverseProxyService, RpsRegistry};
13use crate::sni_resolver::SniCertResolver;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper::Request;
18use hyper_util::rt::TokioIo;
19use std::net::{IpAddr, SocketAddr};
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::{lookup_host, TcpListener, TcpStream};
23use tokio::sync::watch;
24use tokio_rustls::TlsAcceptor;
25use tracing::{debug, error, info, warn};
26
27/// Initial backoff between failed ingress bind attempts.
28const INGRESS_BIND_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
29
30/// Maximum backoff between failed ingress bind attempts. The backoff doubles
31/// from [`INGRESS_BIND_BACKOFF_INITIAL`] up to this cap, then stays there.
32const INGRESS_BIND_BACKOFF_MAX: Duration = Duration::from_secs(30);
33
34/// After the first warning, only warn again every N attempts so a long-held
35/// port (e.g. another process owning :80) does not spam the log every backoff
36/// tick. With the cap above this works out to roughly one warning per minute
37/// in steady state.
38const INGRESS_BIND_WARN_EVERY: u64 = 30;
39
40/// Compute the next backoff for the ingress bind-retry loop: double the current
41/// delay, capped at [`INGRESS_BIND_BACKOFF_MAX`].
42///
43/// Pulled out as a pure function so the backoff schedule can be unit-tested
44/// without binding real sockets.
45#[must_use]
46fn next_ingress_backoff(current: Duration) -> Duration {
47    (current * 2).min(INGRESS_BIND_BACKOFF_MAX)
48}
49
50/// Decide whether the ingress bind-retry loop should emit a warning on this
51/// attempt. Always warns on the first attempt (`attempt == 0`), then only once
52/// every [`INGRESS_BIND_WARN_EVERY`] attempts thereafter.
53///
54/// Pure function so the warn cadence is unit-testable.
55#[must_use]
56fn should_warn_on_attempt(attempt: u64) -> bool {
57    attempt == 0 || attempt % INGRESS_BIND_WARN_EVERY == 0
58}
59
60/// Bind a [`TcpListener`] to `addr`, retrying FOREVER on failure.
61///
62/// This is the ingress bind path: a bind failure is NEVER fatal. On each
63/// failure it logs (warning cadence per [`should_warn_on_attempt`]), sleeps a
64/// capped exponential backoff, and tries again. It only returns once the bind
65/// succeeds (or `shutdown_rx` flips to `true`, in which case it returns
66/// `None`).
67///
68/// `EACCES` (permission denied) gets a dedicated one-time hint, since binding
69/// 80/443 needs root or `CAP_NET_BIND_SERVICE`.
70async fn bind_with_retry(
71    addr: SocketAddr,
72    label: &str,
73    shutdown_rx: &mut watch::Receiver<bool>,
74) -> Option<TcpListener> {
75    let mut attempt: u64 = 0;
76    let mut backoff = INGRESS_BIND_BACKOFF_INITIAL;
77    let mut warned_eacces = false;
78
79    loop {
80        if *shutdown_rx.borrow() {
81            return None;
82        }
83
84        match TcpListener::bind(addr).await {
85            Ok(listener) => {
86                if attempt > 0 {
87                    info!(addr = %addr, label, attempt, "Ingress bound after retrying");
88                }
89                return Some(listener);
90            }
91            Err(e) => {
92                let is_eacces = e.kind() == std::io::ErrorKind::PermissionDenied;
93                if is_eacces && !warned_eacces {
94                    warned_eacces = true;
95                    warn!(
96                        addr = %addr,
97                        label,
98                        error = %e,
99                        "Ingress bind denied: binding 80/443 needs root or CAP_NET_BIND_SERVICE; \
100                         will keep retrying without aborting startup"
101                    );
102                } else if should_warn_on_attempt(attempt) {
103                    warn!(
104                        addr = %addr,
105                        label,
106                        attempt,
107                        error = %e,
108                        "Ingress bind failed; will keep retrying (port may be held by another process)"
109                    );
110                } else {
111                    debug!(addr = %addr, label, attempt, error = %e, "Ingress bind retry");
112                }
113
114                tokio::select! {
115                    () = tokio::time::sleep(backoff) => {}
116                    _ = shutdown_rx.changed() => {
117                        if *shutdown_rx.borrow() {
118                            return None;
119                        }
120                    }
121                }
122
123                attempt = attempt.saturating_add(1);
124                backoff = next_ingress_backoff(backoff);
125            }
126        }
127    }
128}
129
130/// The proxy server
131pub struct ProxyServer {
132    /// Server configuration
133    config: Arc<ProxyConfig>,
134    /// Service registry for route resolution
135    registry: Arc<ServiceRegistry>,
136    /// Load balancer for backend selection
137    load_balancer: Arc<LoadBalancer>,
138    /// Shutdown signal sender
139    shutdown_tx: watch::Sender<bool>,
140    /// Shutdown signal receiver
141    shutdown_rx: watch::Receiver<bool>,
142    /// TLS acceptor for HTTPS connections
143    tls_acceptor: Option<TlsAcceptor>,
144    /// Certificate manager for ACME challenge responses
145    cert_manager: Option<Arc<CertManager>>,
146    /// Optional network policy checker for access control enforcement
147    network_policy_checker: Option<NetworkPolicyChecker>,
148    /// Optional on-demand activator for scale-to-zero services. Threaded into
149    /// every per-connection [`ReverseProxyService`].
150    activator: Option<Arc<dyn Activator>>,
151    /// Optional per-service request-rate registry. Threaded into every
152    /// per-connection [`ReverseProxyService`] so routed requests are counted.
153    rps_registry: Option<Arc<RpsRegistry>>,
154}
155
156impl ProxyServer {
157    /// Create a new proxy server
158    pub fn new(
159        config: ProxyConfig,
160        registry: Arc<ServiceRegistry>,
161        load_balancer: Arc<LoadBalancer>,
162    ) -> Self {
163        let (shutdown_tx, shutdown_rx) = watch::channel(false);
164
165        Self {
166            config: Arc::new(config),
167            registry,
168            load_balancer,
169            shutdown_tx,
170            shutdown_rx,
171            tls_acceptor: None,
172            cert_manager: None,
173            network_policy_checker: None,
174            activator: None,
175            rps_registry: None,
176        }
177    }
178
179    /// Create a proxy server with an existing registry (alias for `new`)
180    pub fn with_registry(
181        config: ProxyConfig,
182        registry: Arc<ServiceRegistry>,
183        load_balancer: Arc<LoadBalancer>,
184    ) -> Self {
185        Self::new(config, registry, load_balancer)
186    }
187
188    /// Create a proxy server with TLS via SNI resolver
189    pub fn with_tls_resolver(
190        config: ProxyConfig,
191        registry: Arc<ServiceRegistry>,
192        load_balancer: Arc<LoadBalancer>,
193        resolver: Arc<SniCertResolver>,
194    ) -> Self {
195        let tls_config = rustls::ServerConfig::builder()
196            .with_no_client_auth()
197            .with_cert_resolver(resolver);
198        let acceptor = TlsAcceptor::from(Arc::new(tls_config));
199        let (shutdown_tx, shutdown_rx) = watch::channel(false);
200
201        Self {
202            config: Arc::new(config),
203            registry,
204            load_balancer,
205            shutdown_tx,
206            shutdown_rx,
207            tls_acceptor: Some(acceptor),
208            cert_manager: None,
209            network_policy_checker: None,
210            activator: None,
211            rps_registry: None,
212        }
213    }
214
215    /// Set the certificate manager for ACME challenge interception
216    #[must_use]
217    pub fn with_cert_manager(mut self, cm: Arc<CertManager>) -> Self {
218        self.cert_manager = Some(cm);
219        self
220    }
221
222    /// Set the network policy checker for access control enforcement
223    #[must_use]
224    pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
225        self.network_policy_checker = Some(checker);
226        self
227    }
228
229    /// Set the on-demand activator for scale-to-zero services.
230    ///
231    /// The activator is threaded into every per-connection
232    /// [`ReverseProxyService`] this server builds, so a request to an idle
233    /// (scaled-to-zero) service triggers a wake-up instead of an immediate
234    /// `503`.
235    #[must_use]
236    pub fn with_activator(mut self, activator: Arc<dyn Activator>) -> Self {
237        self.activator = Some(activator);
238        self
239    }
240
241    /// Set the per-service request-rate registry.
242    ///
243    /// The registry is threaded into every per-connection
244    /// [`ReverseProxyService`] this server builds, so each routed request is
245    /// recorded for autoscaler RPS metrics.
246    #[must_use]
247    pub fn with_rps_registry(mut self, rps_registry: Arc<RpsRegistry>) -> Self {
248        self.rps_registry = Some(rps_registry);
249        self
250    }
251
252    /// Check if TLS is enabled
253    #[must_use]
254    pub fn has_tls(&self) -> bool {
255        self.tls_acceptor.is_some()
256    }
257
258    /// Get the TLS acceptor if configured
259    #[must_use]
260    pub fn tls_acceptor(&self) -> Option<&TlsAcceptor> {
261        self.tls_acceptor.as_ref()
262    }
263
264    /// Get the service registry
265    #[must_use]
266    pub fn registry(&self) -> Arc<ServiceRegistry> {
267        self.registry.clone()
268    }
269
270    /// Get the certificate manager, if one is configured.
271    ///
272    /// Present on a server when [`Self::with_cert_manager`] was called; the
273    /// ACME HTTP-01 challenge interception (and TLS resolution on `:443`)
274    /// require it. The `:80` ingress carries this purely for the HTTP-01
275    /// carve-out — challenges always arrive on `:80`.
276    #[must_use]
277    pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
278        self.cert_manager.as_ref()
279    }
280
281    /// Get the configuration
282    #[must_use]
283    pub fn config(&self) -> Arc<ProxyConfig> {
284        self.config.clone()
285    }
286
287    /// Signal the server to shut down
288    pub fn shutdown(&self) {
289        let _ = self.shutdown_tx.send(true);
290    }
291
292    /// Run the HTTP server
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if binding to the configured HTTP address fails
297    /// or if the accept loop encounters a fatal error.
298    pub async fn run(&self) -> Result<()> {
299        let addr = self.config.server.http_addr;
300        let listener = TcpListener::bind(addr)
301            .await
302            .map_err(|e| ProxyError::BindFailed {
303                addr,
304                reason: e.to_string(),
305            })?;
306
307        info!(addr = %addr, "HTTP proxy server listening");
308
309        self.accept_loop(listener).await
310    }
311
312    /// Run the server on a specific address
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if binding to the given address fails
317    /// or if the accept loop encounters a fatal error.
318    pub async fn run_on(&self, addr: SocketAddr) -> Result<()> {
319        let listener = TcpListener::bind(addr)
320            .await
321            .map_err(|e| ProxyError::BindFailed {
322                addr,
323                reason: e.to_string(),
324            })?;
325
326        info!(addr = %addr, "HTTP proxy server listening");
327
328        self.accept_loop(listener).await
329    }
330
331    /// Run the HTTP ingress server on a specific address, retrying the bind
332    /// FOREVER on failure instead of returning a fatal error.
333    ///
334    /// This is the ingress entry point: if `addr` (typically `0.0.0.0:80`) is
335    /// already held, it logs a warning and keeps retrying the bind with a
336    /// capped exponential backoff until the port frees, then proceeds to the
337    /// normal accept loop. It NEVER returns [`ProxyError::BindFailed`].
338    ///
339    /// # Errors
340    ///
341    /// Returns an error only if the accept loop itself encounters a fatal
342    /// error after a successful bind. Bind failures are non-fatal.
343    pub async fn run_with_retry(&self, addr: SocketAddr) -> Result<()> {
344        let mut shutdown_rx = self.shutdown_rx.clone();
345        let Some(listener) = bind_with_retry(addr, "http", &mut shutdown_rx).await else {
346            // Shutdown requested before we ever bound.
347            return Ok(());
348        };
349
350        info!(addr = %addr, "HTTP ingress server listening");
351
352        self.accept_loop(listener).await
353    }
354
355    /// Run the HTTP accept loop on a caller-supplied, already-bound listener.
356    ///
357    /// Used to attach a second listener (e.g. an explicitly v6-only `[::]:80`
358    /// socket) to an existing [`ProxyServer`] so it shares the same shutdown
359    /// signal and routing state as the primary v4 listener.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error only if the accept loop encounters a fatal error.
364    pub async fn run_on_listener(&self, listener: TcpListener) -> Result<()> {
365        if let Ok(addr) = listener.local_addr() {
366            info!(addr = %addr, "HTTP proxy server listening (pre-bound)");
367        }
368        self.accept_loop(listener).await
369    }
370
371    async fn accept_loop(&self, listener: TcpListener) -> Result<()> {
372        let mut shutdown_rx = self.shutdown_rx.clone();
373
374        loop {
375            tokio::select! {
376                // Check for shutdown signal
377                _ = shutdown_rx.changed() => {
378                    if *shutdown_rx.borrow() {
379                        info!("Shutting down proxy server");
380                        break;
381                    }
382                }
383
384                // Accept new connections
385                result = listener.accept() => {
386                    match result {
387                        Ok((stream, remote_addr)) => {
388                            let registry = self.registry.clone();
389                            let load_balancer = self.load_balancer.clone();
390                            let config = self.config.clone();
391                            let cert_manager = self.cert_manager.clone();
392                            let npc = self.network_policy_checker.clone();
393                            let activator = self.activator.clone();
394                            let rps_registry = self.rps_registry.clone();
395
396                            tokio::spawn(async move {
397                                if let Err(e) = Self::handle_connection(
398                                    stream,
399                                    remote_addr,
400                                    registry,
401                                    load_balancer,
402                                    config,
403                                    cert_manager,
404                                    npc,
405                                    activator,
406                                    rps_registry,
407                                ).await {
408                                    debug!(
409                                        error = %e,
410                                        remote_addr = %remote_addr,
411                                        "Connection error"
412                                    );
413                                }
414                            });
415                        }
416                        Err(e) => {
417                            warn!(error = %e, "Failed to accept connection");
418                        }
419                    }
420                }
421            }
422        }
423
424        Ok(())
425    }
426
427    #[allow(clippy::too_many_arguments)]
428    async fn handle_connection(
429        stream: tokio::net::TcpStream,
430        remote_addr: SocketAddr,
431        registry: Arc<ServiceRegistry>,
432        load_balancer: Arc<LoadBalancer>,
433        config: Arc<ProxyConfig>,
434        cert_manager: Option<Arc<CertManager>>,
435        network_policy_checker: Option<NetworkPolicyChecker>,
436        activator: Option<Arc<dyn Activator>>,
437        rps_registry: Option<Arc<RpsRegistry>>,
438    ) -> Result<()> {
439        let io = TokioIo::new(stream);
440
441        let mut service =
442            ReverseProxyService::new(registry, load_balancer, config).with_remote_addr(remote_addr);
443        if let Some(cm) = cert_manager {
444            service = service.with_cert_manager(cm);
445        }
446        if let Some(checker) = network_policy_checker {
447            service = service.with_network_policy_checker(checker);
448        }
449        if let Some(activator) = activator {
450            service = service.with_activator(activator);
451        }
452        if let Some(rps_registry) = rps_registry {
453            service = service.with_rps_registry(rps_registry);
454        }
455
456        let service = service_fn(move |req: Request<Incoming>| {
457            let svc = service.clone();
458            async move {
459                match svc.proxy_request(req).await {
460                    Ok(response) => Ok::<_, hyper::Error>(response),
461                    Err(e) => {
462                        error!(error = %e, "Proxy error");
463                        Ok(ReverseProxyService::error_response(&e))
464                    }
465                }
466            }
467        });
468
469        http1::Builder::new()
470            .preserve_header_case(true)
471            .title_case_headers(false)
472            .serve_connection(io, service)
473            .with_upgrades()
474            .await
475            .map_err(ProxyError::Hyper)?;
476
477        Ok(())
478    }
479
480    /// Run the HTTPS server
481    ///
482    /// This requires TLS to be configured when creating the `ProxyServer`.
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if TLS is not configured, if binding to the
487    /// configured HTTPS address fails, or if the accept loop encounters a fatal error.
488    pub async fn run_https(&self) -> Result<()> {
489        let acceptor = self
490            .tls_acceptor
491            .as_ref()
492            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
493
494        let addr = self.config.server.https_addr;
495        let listener = TcpListener::bind(addr)
496            .await
497            .map_err(|e| ProxyError::BindFailed {
498                addr,
499                reason: e.to_string(),
500            })?;
501
502        info!(addr = %addr, "HTTPS proxy server listening");
503
504        self.accept_loop_tls(listener, acceptor.clone()).await
505    }
506
507    /// Run the HTTPS server on a specific address
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if TLS is not configured, if binding to the
512    /// given address fails, or if the accept loop encounters a fatal error.
513    pub async fn run_https_on(&self, addr: SocketAddr) -> Result<()> {
514        let acceptor = self
515            .tls_acceptor
516            .as_ref()
517            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
518
519        let listener = TcpListener::bind(addr)
520            .await
521            .map_err(|e| ProxyError::BindFailed {
522                addr,
523                reason: e.to_string(),
524            })?;
525
526        info!(addr = %addr, "HTTPS proxy server listening");
527
528        self.accept_loop_tls(listener, acceptor.clone()).await
529    }
530
531    /// Run the HTTPS ingress server on a specific address, retrying the bind
532    /// FOREVER on failure instead of returning a fatal error.
533    ///
534    /// This is the TLS ingress entry point: if `addr` (typically
535    /// `0.0.0.0:443`) is already held, it logs a warning and keeps retrying
536    /// the bind with a capped exponential backoff until the port frees, then
537    /// proceeds to the normal TLS accept loop. It NEVER returns
538    /// [`ProxyError::BindFailed`].
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if TLS is not configured, or if the accept loop itself
543    /// encounters a fatal error after a successful bind. Bind failures are
544    /// non-fatal.
545    pub async fn run_https_with_retry(&self, addr: SocketAddr) -> Result<()> {
546        let acceptor = self
547            .tls_acceptor
548            .as_ref()
549            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
550
551        let mut shutdown_rx = self.shutdown_rx.clone();
552        let Some(listener) = bind_with_retry(addr, "https", &mut shutdown_rx).await else {
553            // Shutdown requested before we ever bound.
554            return Ok(());
555        };
556
557        info!(addr = %addr, "HTTPS ingress server listening");
558
559        self.accept_loop_tls(listener, acceptor.clone()).await
560    }
561
562    /// Run the HTTPS accept loop on a caller-supplied, already-bound listener.
563    ///
564    /// Uses this server's configured [`TlsAcceptor`] (the same SNI resolver as
565    /// the primary listener), so a second listener (e.g. an explicitly v6-only
566    /// `[::]:443` socket) shares certificates, shutdown, and routing state.
567    ///
568    /// # Errors
569    ///
570    /// Returns an error if TLS is not configured, or if the accept loop
571    /// encounters a fatal error.
572    pub async fn run_https_on_listener(&self, listener: TcpListener) -> Result<()> {
573        let acceptor = self
574            .tls_acceptor
575            .as_ref()
576            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
577
578        if let Ok(addr) = listener.local_addr() {
579            info!(addr = %addr, "HTTPS proxy server listening (pre-bound)");
580        }
581        self.accept_loop_tls(listener, acceptor.clone()).await
582    }
583
584    /// Run both HTTP and HTTPS servers concurrently
585    ///
586    /// This requires TLS to be configured when creating the `ProxyServer`.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if TLS is not configured, if binding to either
591    /// the HTTP or HTTPS address fails, or if either accept loop encounters
592    /// a fatal error.
593    #[allow(clippy::similar_names)]
594    pub async fn run_both(&self) -> Result<()> {
595        let http_addr = self.config.server.http_addr;
596        let https_addr = self.config.server.https_addr;
597
598        let acceptor = self
599            .tls_acceptor
600            .as_ref()
601            .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
602
603        let http_listener =
604            TcpListener::bind(http_addr)
605                .await
606                .map_err(|e| ProxyError::BindFailed {
607                    addr: http_addr,
608                    reason: e.to_string(),
609                })?;
610
611        let https_listener =
612            TcpListener::bind(https_addr)
613                .await
614                .map_err(|e| ProxyError::BindFailed {
615                    addr: https_addr,
616                    reason: e.to_string(),
617                })?;
618
619        info!(http = %http_addr, https = %https_addr, "Proxy server listening");
620
621        // Run both accept loops concurrently
622        let http_future = self.accept_loop(http_listener);
623        let https_future = self.accept_loop_tls(https_listener, acceptor.clone());
624
625        tokio::select! {
626            result = http_future => result,
627            result = https_future => result,
628        }
629    }
630
631    async fn accept_loop_tls(&self, listener: TcpListener, acceptor: TlsAcceptor) -> Result<()> {
632        let mut shutdown_rx = self.shutdown_rx.clone();
633
634        loop {
635            tokio::select! {
636                // Check for shutdown signal
637                _ = shutdown_rx.changed() => {
638                    if *shutdown_rx.borrow() {
639                        info!("Shutting down HTTPS proxy server");
640                        break;
641                    }
642                }
643
644                // Accept new connections
645                result = listener.accept() => {
646                    match result {
647                        Ok((stream, remote_addr)) => {
648                            let registry = self.registry.clone();
649                            let load_balancer = self.load_balancer.clone();
650                            let config = self.config.clone();
651                            let acceptor = acceptor.clone();
652                            let cert_manager = self.cert_manager.clone();
653                            let npc = self.network_policy_checker.clone();
654                            let activator = self.activator.clone();
655                            let rps_registry = self.rps_registry.clone();
656
657                            tokio::spawn(async move {
658                                if let Err(e) = Self::handle_tls_connection(
659                                    stream,
660                                    remote_addr,
661                                    registry,
662                                    load_balancer,
663                                    config,
664                                    acceptor,
665                                    cert_manager,
666                                    npc,
667                                    activator,
668                                    rps_registry,
669                                ).await {
670                                    debug!(
671                                        error = %e,
672                                        remote_addr = %remote_addr,
673                                        "TLS connection error"
674                                    );
675                                }
676                            });
677                        }
678                        Err(e) => {
679                            warn!(error = %e, "Failed to accept TLS connection");
680                        }
681                    }
682                }
683            }
684        }
685
686        Ok(())
687    }
688
689    #[allow(clippy::too_many_arguments)]
690    async fn handle_tls_connection(
691        stream: tokio::net::TcpStream,
692        remote_addr: SocketAddr,
693        registry: Arc<ServiceRegistry>,
694        load_balancer: Arc<LoadBalancer>,
695        config: Arc<ProxyConfig>,
696        acceptor: TlsAcceptor,
697        cert_manager: Option<Arc<CertManager>>,
698        network_policy_checker: Option<NetworkPolicyChecker>,
699        activator: Option<Arc<dyn Activator>>,
700        rps_registry: Option<Arc<RpsRegistry>>,
701    ) -> Result<()> {
702        // Intercept-but-passthrough: peek the ClientHello SNI WITHOUT consuming
703        // the socket bytes, so we can decide whether to terminate TLS (managed)
704        // or TCP-splice raw to the real upstream (unmanaged) before rustls ever
705        // sees a cert-less SNI and aborts — which would otherwise hang the
706        // client.
707        let sni = Self::peek_sni(&stream).await;
708
709        // A route existing for this host is the primary "we own this" signal.
710        // When there's no SNI at all we treat it as managed and fall through to
711        // the normal accept path (a default cert / catch-all route may apply,
712        // and we must never passthrough to an unknown destination).
713        let managed = match sni.as_deref() {
714            Some(host) => registry.resolve(Some(host), "/").await.is_some(),
715            None => true,
716        };
717
718        if !managed {
719            // `managed` is only false when `sni` is `Some`.
720            if let Some(host) = sni {
721                return Self::passthrough_unmanaged(stream, remote_addr, host).await;
722            }
723        }
724
725        // MANAGED path (unchanged): terminate TLS and reverse-proxy. The peeked
726        // bytes are still queued in the socket, so rustls reads the full
727        // ClientHello with no replay needed.
728        let tls_stream = acceptor
729            .accept(stream)
730            .await
731            .map_err(|e| ProxyError::Tls(format!("TLS handshake failed: {e}")))?;
732
733        let io = TokioIo::new(tls_stream);
734
735        let mut service = ReverseProxyService::new(registry, load_balancer, config)
736            .with_remote_addr(remote_addr)
737            .with_tls(true);
738        if let Some(cm) = cert_manager {
739            service = service.with_cert_manager(cm);
740        }
741        if let Some(checker) = network_policy_checker {
742            service = service.with_network_policy_checker(checker);
743        }
744        if let Some(activator) = activator {
745            service = service.with_activator(activator);
746        }
747        if let Some(rps_registry) = rps_registry {
748            service = service.with_rps_registry(rps_registry);
749        }
750
751        let service = service_fn(move |req: Request<Incoming>| {
752            let svc = service.clone();
753            async move {
754                match svc.proxy_request(req).await {
755                    Ok(response) => Ok::<_, hyper::Error>(response),
756                    Err(e) => {
757                        error!(error = %e, "Proxy error");
758                        Ok(ReverseProxyService::error_response(&e))
759                    }
760                }
761            }
762        });
763
764        http1::Builder::new()
765            .preserve_header_case(true)
766            .title_case_headers(false)
767            .serve_connection(io, service)
768            .with_upgrades()
769            .await
770            .map_err(ProxyError::Hyper)?;
771
772        Ok(())
773    }
774
775    /// Peek (without consuming) the initial bytes of `stream` and extract the
776    /// SNI host from the TLS `ClientHello`.
777    ///
778    /// Uses [`TcpStream::peek`] so the bytes stay queued in the socket for the
779    /// subsequent `acceptor.accept` (managed) or raw splice (unmanaged) — there
780    /// is no ClientHello-replay problem. Bounded by a byte cap and a short
781    /// timeout so a slowloris cannot stall the accept task forever. Returns
782    /// `None` on close, timeout, or unparseable input.
783    async fn peek_sni(stream: &TcpStream) -> Option<String> {
784        /// Maximum bytes to peek for the `ClientHello`.
785        const MAX_PEEK: usize = 8192;
786        /// Overall budget to receive enough of the `ClientHello`.
787        const PEEK_TIMEOUT: Duration = Duration::from_secs(5);
788
789        let mut buf = vec![0u8; MAX_PEEK];
790        let result = tokio::time::timeout(PEEK_TIMEOUT, async {
791            loop {
792                let n = match stream.peek(&mut buf).await {
793                    // peer closed (Ok(0)) or errored before sending a ClientHello
794                    Ok(n) if n > 0 => n,
795                    _ => return None,
796                };
797
798                // Once the 5-byte record header is buffered, wait until the
799                // whole record (capped at MAX_PEEK) is present before parsing.
800                if n >= 5 {
801                    let record_len = ((buf[3] as usize) << 8) | buf[4] as usize;
802                    let needed = (5 + record_len).min(MAX_PEEK);
803                    if n >= needed {
804                        return crate::sni_peek::parse_sni(&buf[..n]);
805                    }
806                }
807                if n >= MAX_PEEK {
808                    return crate::sni_peek::parse_sni(&buf[..n]);
809                }
810
811                // Not enough yet — yield briefly so we don't busy-spin while the
812                // rest of the ClientHello trickles in.
813                tokio::time::sleep(Duration::from_millis(5)).await;
814            }
815        })
816        .await;
817
818        result.ok().flatten()
819    }
820
821    /// Splice an unmanaged-SNI connection straight to its real upstream on the
822    /// standard HTTPS port (443), terminating no TLS.
823    ///
824    /// Refuses to connect to loopback / unspecified addresses to avoid a
825    /// proxy→self loop. On any DNS / connect / loop-guard failure the
826    /// connection is dropped promptly (returns `Ok(())` which closes it) — it
827    /// must never hang.
828    async fn passthrough_unmanaged(
829        client: TcpStream,
830        remote_addr: SocketAddr,
831        host: String,
832    ) -> Result<()> {
833        let mut addrs = match lookup_host((host.as_str(), 443u16)).await {
834            Ok(it) => it,
835            Err(e) => {
836                debug!(host = %host, error = %e, "TLS passthrough DNS lookup failed; dropping");
837                return Ok(());
838            }
839        };
840
841        // LOOP-GUARD: skip any loopback/unspecified IP so we never proxy to
842        // ourselves; pick the first real upstream.
843        let Some(upstream_addr) = addrs.find(|a| !Self::is_self_addr(&a.ip())) else {
844            warn!(
845                host = %host,
846                "TLS passthrough refused: SNI resolves only to loopback/unspecified (loop guard)"
847            );
848            return Ok(());
849        };
850
851        let upstream = match TcpStream::connect(upstream_addr).await {
852            Ok(s) => s,
853            Err(e) => {
854                debug!(
855                    host = %host,
856                    upstream = %upstream_addr,
857                    error = %e,
858                    "TLS passthrough connect failed; dropping"
859                );
860                return Ok(());
861            }
862        };
863
864        debug!(
865            host = %host,
866            upstream = %upstream_addr,
867            client = %remote_addr,
868            "TLS passthrough (unmanaged SNI) -> upstream"
869        );
870        crate::stream::TcpStreamService::splice(client, upstream).await;
871        Ok(())
872    }
873
874    /// True if `ip` would loop the proxy back to itself — loopback
875    /// (`127.0.0.0/8`, `::1`) or unspecified (`0.0.0.0`, `::`).
876    fn is_self_addr(ip: &IpAddr) -> bool {
877        ip.is_loopback() || ip.is_unspecified()
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use super::*;
884    use crate::lb::LoadBalancer;
885    use crate::routes::{ResolvedService, RouteEntry};
886    use zlayer_spec::{ExposeType, Protocol};
887
888    /// Helper to build a minimal `RouteEntry` for tests.
889    fn make_entry(
890        service: &str,
891        host: Option<&str>,
892        path: &str,
893        backends: Vec<SocketAddr>,
894    ) -> RouteEntry {
895        RouteEntry {
896            service_name: service.to_string(),
897            endpoint_name: "http".to_string(),
898            host: host.map(std::string::ToString::to_string),
899            path_prefix: path.to_string(),
900            resolved: ResolvedService {
901                name: service.to_string(),
902                backends,
903                use_tls: false,
904                sni_hostname: String::new(),
905                expose: ExposeType::Public,
906                protocol: Protocol::Http,
907                strip_prefix: false,
908                path_prefix: path.to_string(),
909                target_port: 8080,
910            },
911        }
912    }
913
914    use tokio::io::{AsyncReadExt, AsyncWriteExt};
915
916    /// Drive a single raw HTTP/1.1 request through `handle_connection` and
917    /// return the raw response bytes (status line + headers + body).
918    ///
919    /// This exercises the FULL ingress path — route resolution, the
920    /// default-deny on an unmatched route, the ACME carve-out, and the
921    /// generic `error_response` body — exactly as a real client would hit it,
922    /// without binding a privileged port.
923    async fn roundtrip(
924        registry: Arc<ServiceRegistry>,
925        load_balancer: Arc<LoadBalancer>,
926        cert_manager: Option<Arc<CertManager>>,
927        raw_request: &str,
928    ) -> String {
929        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
930        let addr = listener.local_addr().unwrap();
931
932        let server = tokio::spawn(async move {
933            let (stream, remote_addr) = listener.accept().await.unwrap();
934            let _ = ProxyServer::handle_connection(
935                stream,
936                remote_addr,
937                registry,
938                load_balancer,
939                Arc::new(ProxyConfig::default()),
940                cert_manager,
941                None,
942                None,
943                None,
944            )
945            .await;
946        });
947
948        let mut client = tokio::net::TcpStream::connect(addr).await.unwrap();
949        client.write_all(raw_request.as_bytes()).await.unwrap();
950        client.flush().await.unwrap();
951
952        let mut buf = Vec::new();
953        // Read until the peer closes (Connection: close ensures EOF).
954        let _ = client.read_to_end(&mut buf).await;
955        server.abort();
956
957        String::from_utf8_lossy(&buf).into_owned()
958    }
959
960    /// An unregistered Host (no matching route) must get a clean 404 with a
961    /// generic body — NOT forwarded anywhere, and NOT echoing the requested
962    /// Host/path. This is the default-deny boundary for the open ingress.
963    #[tokio::test]
964    async fn test_unmatched_host_denied_404_generic_body() {
965        let registry = Arc::new(ServiceRegistry::new());
966        // Register a route for ONE host so the registry is non-empty; the
967        // request below targets a DIFFERENT host that matches nothing.
968        registry
969            .register(make_entry(
970                "known",
971                Some("known.example.com"),
972                "/",
973                vec!["127.0.0.1:9".parse().unwrap()],
974            ))
975            .await;
976        let lb = Arc::new(LoadBalancer::new());
977
978        let resp = roundtrip(
979            registry,
980            lb,
981            None,
982            "GET /secret/path HTTP/1.1\r\nHost: attacker.unregistered.test\r\nConnection: close\r\n\r\n",
983        )
984        .await;
985
986        assert!(
987            resp.starts_with("HTTP/1.1 404"),
988            "unmatched host must be denied with 404, got: {resp}"
989        );
990        // The generic body must not leak the requested host or path.
991        assert!(
992            !resp.contains("attacker.unregistered.test"),
993            "response must not echo the requested host: {resp}"
994        );
995        assert!(
996            !resp.contains("/secret/path"),
997            "response must not echo the requested path: {resp}"
998        );
999        assert!(
1000            resp.contains("404 Not Found"),
1001            "response should carry the generic 404 body: {resp}"
1002        );
1003    }
1004
1005    /// The ACME HTTP-01 challenge path must still be served (this is how certs
1006    /// get issued) — the default-deny must come AFTER the ACME carve-out.
1007    #[tokio::test]
1008    async fn test_acme_challenge_served_not_denied() {
1009        let tmp = tempfile::tempdir().unwrap();
1010        let cm = Arc::new(
1011            CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
1012                .await
1013                .unwrap(),
1014        );
1015        let token = "test-token-abc";
1016        cm.store_challenge(token, "example.com", "key-auth-payload-123");
1017
1018        // Empty registry: nothing is registered, so if the ACME carve-out
1019        // were missing this would be denied by default.
1020        let registry = Arc::new(ServiceRegistry::new());
1021        let lb = Arc::new(LoadBalancer::new());
1022
1023        let resp = roundtrip(
1024            registry,
1025            lb,
1026            Some(cm),
1027            &format!(
1028                "GET /.well-known/acme-challenge/{token} HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
1029            ),
1030        )
1031        .await;
1032
1033        assert!(
1034            resp.starts_with("HTTP/1.1 200"),
1035            "ACME challenge must be served with 200, got: {resp}"
1036        );
1037        assert!(
1038            resp.contains("key-auth-payload-123"),
1039            "ACME challenge must return the key authorization: {resp}"
1040        );
1041    }
1042
1043    /// An ACME challenge path with an UNKNOWN token must be handled terminally
1044    /// with a 404 — it must NOT fall through to vhost routing (which would
1045    /// return the default-deny 403/404 for the host). The acme-challenge
1046    /// carve-out owns the path entirely, even when no token is stored.
1047    #[tokio::test]
1048    async fn test_acme_challenge_unknown_token_404_not_forbidden() {
1049        let tmp = tempfile::tempdir().unwrap();
1050        let cm = Arc::new(
1051            CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
1052                .await
1053                .unwrap(),
1054        );
1055        // No token is stored, so the lookup misses.
1056
1057        // Register a route for ONE host so the registry is non-empty; the
1058        // request below targets a DIFFERENT host that matches nothing — proving
1059        // the acme path is handled terminally and never reaches the vhost
1060        // default-deny (which returns Forbidden/404 for unmatched hosts).
1061        let registry = Arc::new(ServiceRegistry::new());
1062        registry
1063            .register(make_entry(
1064                "known",
1065                Some("known.example.com"),
1066                "/",
1067                vec!["127.0.0.1:9".parse().unwrap()],
1068            ))
1069            .await;
1070        let lb = Arc::new(LoadBalancer::new());
1071
1072        let resp = roundtrip(
1073            registry,
1074            lb,
1075            Some(cm),
1076            "GET /.well-known/acme-challenge/does-not-exist HTTP/1.1\r\nHost: console.zatabase.io\r\nConnection: close\r\n\r\n",
1077        )
1078        .await;
1079
1080        assert!(
1081            resp.starts_with("HTTP/1.1 404"),
1082            "unknown ACME token must return 404, got: {resp}"
1083        );
1084        assert!(
1085            !resp.starts_with("HTTP/1.1 403"),
1086            "unknown ACME token must NOT fall through to vhost 403: {resp}"
1087        );
1088        assert!(
1089            resp.contains("ACME challenge token not found"),
1090            "404 should carry the ACME-specific body: {resp}"
1091        );
1092    }
1093
1094    /// A matched route whose backend group has zero healthy backends must
1095    /// return 503 with a generic body — NOT leaking the internal LB group
1096    /// name.
1097    #[tokio::test]
1098    async fn test_matched_no_backends_503_generic_body() {
1099        let registry = Arc::new(ServiceRegistry::new());
1100        // Route matches, but its resolved LB group name (carried in the body
1101        // of the old leaky error) must never reach the client.
1102        let lb_group = "prod/api#http-secret-group";
1103        let mut entry = make_entry("api", Some("api.example.com"), "/", vec![]);
1104        entry.resolved.name = lb_group.to_string();
1105        registry.register(entry).await;
1106
1107        // Load balancer has NO group registered for this name → select()
1108        // returns None → NoHealthyBackends.
1109        let lb = Arc::new(LoadBalancer::new());
1110
1111        let resp = roundtrip(
1112            registry,
1113            lb,
1114            None,
1115            "GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n",
1116        )
1117        .await;
1118
1119        assert!(
1120            resp.starts_with("HTTP/1.1 503"),
1121            "matched route with no healthy backends must return 503, got: {resp}"
1122        );
1123        assert!(
1124            !resp.contains(lb_group),
1125            "503 body must not leak the internal LB group name: {resp}"
1126        );
1127        assert!(
1128            resp.contains("503 Service Unavailable"),
1129            "response should carry the generic 503 body: {resp}"
1130        );
1131    }
1132
1133    #[tokio::test]
1134    async fn test_server_shutdown() {
1135        let registry = Arc::new(ServiceRegistry::new());
1136        let lb = Arc::new(LoadBalancer::new());
1137        let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
1138
1139        // Create a separate handle for shutdown
1140        let shutdown_tx = server.shutdown_tx.clone();
1141
1142        // Signal shutdown immediately
1143        let _ = shutdown_tx.send(true);
1144
1145        // Server should exit gracefully
1146        // (In a real test, we'd spawn the server and verify it stops)
1147    }
1148
1149    #[tokio::test]
1150    async fn test_registry_integration() {
1151        let registry = Arc::new(ServiceRegistry::new());
1152
1153        // Add a route
1154        registry
1155            .register(make_entry(
1156                "test-service",
1157                None,
1158                "/api",
1159                vec!["127.0.0.1:8081".parse().unwrap()],
1160            ))
1161            .await;
1162
1163        let lb = Arc::new(LoadBalancer::new());
1164        let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
1165
1166        // Verify registry is accessible
1167        let reg = server.registry();
1168        assert_eq!(reg.route_count().await, 1);
1169    }
1170
1171    #[test]
1172    fn test_next_ingress_backoff_doubles_then_caps() {
1173        // Doubles from the initial value.
1174        assert_eq!(
1175            next_ingress_backoff(INGRESS_BIND_BACKOFF_INITIAL),
1176            INGRESS_BIND_BACKOFF_INITIAL * 2
1177        );
1178        // Keeps doubling until it reaches the cap.
1179        let mut d = INGRESS_BIND_BACKOFF_INITIAL;
1180        for _ in 0..20 {
1181            d = next_ingress_backoff(d);
1182        }
1183        assert_eq!(d, INGRESS_BIND_BACKOFF_MAX);
1184        // Never exceeds the cap once there.
1185        assert_eq!(
1186            next_ingress_backoff(INGRESS_BIND_BACKOFF_MAX),
1187            INGRESS_BIND_BACKOFF_MAX
1188        );
1189    }
1190
1191    #[test]
1192    fn test_should_warn_cadence() {
1193        // Always warns on the first attempt.
1194        assert!(should_warn_on_attempt(0));
1195        // Quiet in between.
1196        assert!(!should_warn_on_attempt(1));
1197        assert!(!should_warn_on_attempt(INGRESS_BIND_WARN_EVERY - 1));
1198        // Warns again every N attempts.
1199        assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY));
1200        assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY * 2));
1201    }
1202
1203    #[tokio::test]
1204    async fn test_bind_with_retry_succeeds_after_initial_conflict() {
1205        // Hold an ephemeral port, then spawn a retrying bind against it; once
1206        // we drop our listener the retry loop must grab the port and return.
1207        let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
1208        let addr = held.local_addr().unwrap();
1209
1210        let (_tx, mut rx) = watch::channel(false);
1211        let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
1212
1213        // Give the retry loop a chance to observe the conflict at least once.
1214        tokio::time::sleep(Duration::from_millis(50)).await;
1215        // Free the port; the retry loop should now bind it.
1216        drop(held);
1217
1218        let bound = tokio::time::timeout(Duration::from_secs(10), handle)
1219            .await
1220            .expect("bind_with_retry did not finish")
1221            .expect("task panicked");
1222        let listener = bound.expect("expected a bound listener, got None");
1223        assert_eq!(listener.local_addr().unwrap().port(), addr.port());
1224    }
1225
1226    #[tokio::test]
1227    async fn test_bind_with_retry_returns_none_on_shutdown() {
1228        // A port held for the whole test: the retry loop can never bind, so a
1229        // shutdown signal must unblock it and yield None (never errors).
1230        let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
1231        let addr = held.local_addr().unwrap();
1232
1233        let (tx, mut rx) = watch::channel(false);
1234        let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
1235
1236        tokio::time::sleep(Duration::from_millis(50)).await;
1237        tx.send(true).unwrap();
1238
1239        let result = tokio::time::timeout(Duration::from_secs(10), handle)
1240            .await
1241            .expect("bind_with_retry did not respond to shutdown")
1242            .expect("task panicked");
1243        assert!(result.is_none(), "shutdown should yield None");
1244    }
1245}